BigWorld 的网络框架
通信地址 Mailbox
在常规的网络通信中,通信地址一般都是以(ip,port)二元组的形式来构成的。如果对于指定的(ip,port)上有多个可以通信的entity/service实例的话,这个二元组就可以扩充为三元组(ip,port,entity_id/service_id)。在mosaic_game中的anchor就是这样的三元组的实现,不过anchor里用server_name来替代(ip,port)二元组,所以是由(server_name, entity_id/service_id)这样的二元组组成的。在Bigworld里也有一个专门的通信地址类型Address, 里面封装了(ip,port)二元组,然后还加上一个salt,有时候这个salt区分不同时间启动的占用同一个(ip,port)的通信地址,有时候这个字段又可以用来区分不同的Entity类型:
class Address
{
public:
/// @name Construction/Destruction
// @{
Address();
Address( uint32 ipArg, uint16 portArg );
// @}
uint32 ip; ///< IP address.
uint16 port; ///< The port.
uint16 salt; ///< Different each time.
int writeToString( char * str, int length ) const;
// TODO: Remove this operator
operator char*() const { return this->c_str(); }
char * c_str() const;
const char * ipAsString() const;
bool isNone() const { return this->ip == 0; }
static Watcher & watcher();
static const Address NONE;
private:
/// Temporary storage used for converting the address to a string. At
/// present we support having two string representations at once.
static const int MAX_STRLEN = 32;
static char s_stringBuf[ 2 ][ MAX_STRLEN ];
static int s_currStringBuf;
static char * nextStringBuf();
};
然后对于指定进程上的Entity的通信地址,使用专门的EntityMailBoxRef来代表,其实就是在Address的基础上加上了EntityId:
typedef int32 EntityID;
/**
* This structure is a packed version of a mailbox for an entity
*/
class EntityMailBoxRef
{
public:
EntityID id;
Mercury::Address addr;
enum Component
{
CELL = 0,
BASE = 1,
CLIENT = 2,
BASE_VIA_CELL = 3,
CLIENT_VIA_CELL = 4,
CELL_VIA_BASE = 5,
CLIENT_VIA_BASE = 6,
SERVICE = 7
};
EntityMailBoxRef():
id( 0 ),
addr( Mercury::Address::NONE )
{}
bool hasAddress() const { return addr != Mercury::Address::NONE; }
Component component() const { return (Component)(addr.salt >> 13); }
void component( Component c ) { addr.salt = type() | (uint16(c) << 13); }
EntityTypeID type() const { return addr.salt & 0x1FFF; }
void type( EntityTypeID t ) { addr.salt = (addr.salt & 0xE000) | t; }
void init() { id = 0; addr.ip = 0; addr.port = 0; addr.salt = 0; }
void init( EntityID i, const Mercury::Address & a,
Component c, EntityTypeID t )
{ id = i; addr = a; addr.salt = (uint16(c) << 13) | t; }
static const char * componentAsStr( Component component );
const char * componentName() const
{
return componentAsStr( this->component() );
}
};
这里的枚举类型Component定义了一个EntityMailBoxRef地址的具体类型,目前有8种,复用addr.salt的高3位来表示:
CELL,一个在CellApp上的Entity地址,也是最直接的Entity地址BASE,一个在BaseApp上的Base对象的地址,这个Base对象负责控制对应的RealEntityCLIENT,一个客户端地址BASE_VIA_CELL,一个中转用的CELL地址,向这个CELL地址投递的消息在被Entity接受之后,会自动的转发到对应的Base对象上CLIENT_VIA_CELL,一个中转用的CELL地址,向这个CELL地址投递的消息在被Entity接受之后,会自动的转发到对应的客户端对象上CELL_VIA_BASE, 一个中转用的Base地址,向这个BASE地址投递的消息在被Base接收之后,会自动的转发到对应的RealEntity上CLIENT_VIA_BASE,一个中转用的Base地址,向这个BASE地址投递的消息在被Base接收之后,会自动的转发到对应的CLIENT上SERVICE,一个服务地址
EntityMailBoxRef这个类型仅仅是作为通信地址使用的,过于底层。业务层里用来发送消息的基本都是PyEntityMailBox这个类型,因为这个类型除了存储了具体的通信地址之外,还负责对所有发送到这个地址的数据做一些封装转换的工作:
/**
* This class is used to represent a destination of an entity that messages
* can be sent to.
*
* Its virtual methods are implemented differently on each component.
*/
class PyEntityMailBox: public PyObjectPlus
{
Py_Header( PyEntityMailBox, PyObjectPlus )
public:
PyEntityMailBox( PyTypeObject * pType = &PyEntityMailBox::s_type_ );
virtual ~PyEntityMailBox();
/**
* Get a stream for the remote method to add arguments to.
*
* @param methodDesc The method description.
* @param pHandler If the method requires a request, this is the
* reply handler to use.
*/
virtual BinaryOStream * getStream( const MethodDescription & methodDesc,
std::auto_ptr< Mercury::ReplyMessageHandler > pHandler =
std::auto_ptr< Mercury::ReplyMessageHandler >() ) = 0;
static PyObject * constructFromRef( const EntityMailBoxRef & ref );
static bool reduceToRef( PyObject * pObject, EntityMailBoxRef * pRefOutput );
virtual EntityID id() const = 0;
virtual void address( const Mercury::Address & addr ) = 0;
virtual const Mercury::Address address() const = 0;
virtual void migrate() {}
typedef PyObject * (*FactoryFn)( const EntityMailBoxRef & ref );
static void registerMailBoxComponentFactory(
EntityMailBoxRef::Component c, FactoryFn fn,
PyTypeObject * pType );
typedef bool (*CheckFn)( PyObject * pObject );
typedef EntityMailBoxRef (*ExtractFn)( PyObject * pObject );
static void registerMailBoxRefEquivalent( CheckFn cf, ExtractFn ef );
PY_RO_ATTRIBUTE_DECLARE( this->id(), id );
PyObject * pyGet_address();
PY_RO_ATTRIBUTE_SET( address );
PY_AUTO_METHOD_DECLARE( RETOWN, callMethod,
ARG( ScriptString, ARG( ScriptTuple, END ) ) );
PyObject * callMethod(
const ScriptString & methodName, const ScriptTuple & arguments );
PyObject * callMethod(
const MethodDescription * methodDescription,
const ScriptTuple & args );
// 省略很多函数声明
};
这里还提供了一个静态函数来通过EntityMailBoxRef创建具体的PyEntityMailBox子类型对象:
/**
* Construct a PyEntityMailBox or equivalent from an EntityMailBoxRef.
* Returns Py_None on failure.
*/
PyObject * PyEntityMailBox::constructFromRef(
const EntityMailBoxRef & ref )
{
if (ref.id == 0) Py_RETURN_NONE;
if (s_pRefReg == NULL) Py_RETURN_NONE;
Fabricators::iterator found = s_pRefReg->fabs_.find( ref.component() );
if (found == s_pRefReg->fabs_.end()) Py_RETURN_NONE;
PyObject * pResult = (*found->second)( ref );
if (pResult)
{
return pResult;
}
else
{
WARNING_MSG( "PyEntityMailBox::constructFromRef: "
"Could not create mailbox from id %d. addr %s. component %d\n",
ref.id, ref.addr.c_str(), ref.component() );
Py_RETURN_NONE;
}
}
/**
* Register a PyEntityMailBox factory
*/
void PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::Component c, FactoryFn fn, PyTypeObject * pType )
{
if (s_pRefReg == NULL) s_pRefReg = new MailBoxRefRegistry();
s_pRefReg->fabs_.insert( std::make_pair( c, fn ) );
s_pRefReg->mailBoxTypes_.push_back( pType );
}
创建子类型实例的时候使用了一个map来接收所有注册过来的子类型,目前的代码里使用的是手动注册的方式:
/**
* This class registers our classes into the PyEntityMailBox system,
* and provides some glue/helper functions for it.
*/
static class CellAppPostOfficeAttendant
{
public:
CellAppPostOfficeAttendant()
{
PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::CELL, newCellMB, &CellEntityMailBox::s_type_ );
PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::SERVICE, newBaseMB, &BaseEntityMailBox::s_type_ );
PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::BASE, newBaseMB, &BaseEntityMailBox::s_type_ );
PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::BASE_VIA_CELL, newBaseViaCellMB, &BaseViaCellMailBox::s_type_ );
PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::CELL_VIA_BASE, newCellViaBaseMB, &CellViaBaseMailBox::s_type_ );
PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::CLIENT_VIA_CELL, newClientViaCellMB, &ClientViaCellMailBox::s_type_ );
PyEntityMailBox::registerMailBoxComponentFactory(
EntityMailBoxRef::CLIENT_VIA_BASE, newClientViaBaseMB, &ClientViaBaseMailBox::s_type_ );
PyEntityMailBox::registerMailBoxRefEquivalent(
ServerEntityMailBox::Check, ServerEntityMailBox::static_ref );
PyEntityMailBox::registerMailBoxRefEquivalent(
Entity::Check, cellReduce );
}
// 省略很多代码
};
可以看出对于之前定义的枚举类EntityMailBoxRef::Component的每个值,都会有一个具体的PyEntityMailBox来承接功能。这里我们简单的来看看使用最多的CellViaBaseMailBox类型:
/**
* This class implements a mailbox that can send to a server object. This
* object may be on a cell or may be a base.
*
* @see CellEntityMailBox
* @see BaseEntityMailBox
*/
class ServerEntityMailBox: public PyEntityMailBox
{
Py_Header( ServerEntityMailBox, PyEntityMailBox )
public:
ServerEntityMailBox( EntityTypePtr pBaseType,
const Mercury::Address & addr, EntityID id,
PyTypeObject * pType = &s_type_ );
virtual ~ServerEntityMailBox();
virtual const Mercury::Address address() const { return addr_; }
virtual void address( const Mercury::Address & addr ) { addr_ = addr; }
virtual void migrate();
virtual EntityID id() const { return id_; }
// 省略一些python交互代码
EntityMailBoxRef ref() const;
virtual EntityMailBoxRef::Component component() const = 0;
const char * componentName() const;
static EntityMailBoxRef static_ref( PyObject * pThis )
{ return ((const ServerEntityMailBox*)pThis)->ref(); }
static void migrateMailBoxes();
static void adjustForDeadBaseApp( const Mercury::Address & deadAddr,
const BackupHash & backupHash );
protected:
Mercury::Address addr_;
EntityID id_;
EntityTypePtr pLocalType_;
};
/**
* This class is common to all mailboxes that send to the base entity or via
* the base entity.
*/
class CommonBaseEntityMailBox : public ServerEntityMailBox
{
Py_Header( CommonBaseEntityMailBox, ServerEntityMailBox )
public:
CommonBaseEntityMailBox( EntityTypePtr pBaseType,
const Mercury::Address & addr, EntityID id,
PyTypeObject * pType = &s_type_ ) :
ServerEntityMailBox( pBaseType, addr, id, pType )
{}
void sendStream();
protected:
Mercury::Bundle & bundle() const;
private:
virtual Mercury::UDPChannel & channel() const;
Mercury::UDPChannel & channel( Entity * pEntity ) const;
};
/**
* This class is used to create a mailbox to a cell entity. Traffic for the
* entity is sent via the base entity instead of directly to the cell entity.
* This means that these mailboxes do not have the restrictions that normal
* cell entity mailboxes have.
*/
class CellViaBaseMailBox : public CommonBaseEntityMailBox
{
Py_Header( CellViaBaseMailBox, CommonBaseEntityMailBox )
public:
CellViaBaseMailBox( EntityTypePtr pBaseType,
const Mercury::Address & addr, EntityID id,
PyTypeObject * pType = &s_type_ ):
CommonBaseEntityMailBox( pBaseType, addr, id, pType )
{}
~CellViaBaseMailBox() { }
virtual ScriptObject pyGetAttribute( const ScriptString & attrObj );
virtual BinaryOStream * getStream( const MethodDescription & methodDesc,
std::auto_ptr< Mercury::ReplyMessageHandler > pHandler );
virtual EntityMailBoxRef::Component component() const;
virtual const MethodDescription * findMethod( const char * attr ) const;
};
没想到这个CellViaBaseMailBox的继承层级还比较深,其继承链为CommonBaseEntityMailBox => ServerEntityMailBox => PyEntityMailBox。在ServerEntityMailBox类型上负责提供Address,EntityId, EntityTypePtr这三个成员变量,基本对应上了EntityMailBoxRef的各种分量,所以这里提供了ref函数来构造EntityMailBoxRef:
/**
* Get a packed ref representation of this mailbox
*/
EntityMailBoxRef ServerEntityMailBox::ref() const
{
EntityMailBoxRef mbr; mbr.init(
id_, addr_, this->component(), pLocalType_->description().index() );
return mbr;
}
然后在CommonBaseEntityMailBox这个类型上开始提供bundle和channel这两个接口。这里的channel就是一个信道的概念,负责消息的可靠收发。其实这里的channel并没有绑定在当前的CommonBaseEntityMailBox,而是绑定在RealEntity上,如果当前CellApp里找不到这个RealEntity则使用裸地址来创建channel:
/**
* This method returns the most appropriate channel for this mailbox. The
* entity is expected to have already been looked up. It will use the entity
* channel if it can, otherwise it just falls through to the base
* implementation.
*/
Mercury::UDPChannel & CommonBaseEntityMailBox::channel( Entity * pEntity ) const
{
return (pEntity && pEntity->isReal()) ?
pEntity->pReal()->channel() : CellApp::getChannel( addr_ );
}
然后bundle是单个消息包的概念, 每次调用bundle的时候都会自动的从对应的channel里添加一个新的bundle,然后返回这个bundle的引用来做数据填充:
/**
* This method returns a bundle that will be sent to the base entity. It
* overrides the base behaviour of just returning the channel's bundle by
* prefixing the bundle with a setClient message if we're not sending on the
* entity channel.
*/
Mercury::Bundle & CommonBaseEntityMailBox::bundle() const
{
Entity * pEntity = CellApp::instance().findEntity( id_ );
Mercury::Bundle & bundle = this->channel( pEntity ).bundle();
if (!pEntity || !pEntity->isReal())
{
BaseAppIntInterface::setClientArgs::start( bundle ).id = id_;
}
return bundle;
}
这里的逻辑是:如果没有找到RealEntity,这会手动的将bundle里填充id字段为要通信的Base对应的EntityID。这就是一个最基本的数据流转换操作。
裸的bundle使用起来过于自由,因为什么都可以往里面填,对于业务层维护消息类型的时候不怎么友好。所以在CellViaBaseMailBox这个类型上提供了getStream接口,这个接口负责初始化一个指定RPC的Bundle:
BinaryOStream * CellViaBaseMailBox::getStream(
const MethodDescription & methodDesc,
std::auto_ptr< Mercury::ReplyMessageHandler > pHandler )
{
Mercury::Bundle & bundle = this->bundle();
// Not supporting return values
if (pHandler.get())
{
PyErr_Format( PyExc_TypeError,
"Cannot call two-way method '%s' from CellApp",
methodDesc.name().c_str() );
return NULL;
}
bundle.startMessage( BaseAppIntInterface::callCellMethod );
bundle << methodDesc.internalIndex();
return &bundle;
}
这里负责先使用startMessage来填充当前的消息上层类型为BaseAppIntInterface::callCellMethod,然后再填入methodDesc.internalIndex()代表内层包裹的消息的具体索引。后续业务层再往这个bundle填充消息的时候,完全不知道这个RPC消息其实已经被添加了一个BaseAppIntInterface::callCellMethod的头部,神不知鬼不觉的做了一个消息转换。
数据流与封包
在Bigworld里,往bundle里填充数据使用的是类似于std::iostream的流式处理,利用operator<<来填入数据,然后用operator>>来解析数据。在bundle的头文件声明里,已经提供好了绝大部分的基础类型的operator<<的支持,例如bool,int,String,Vector等:
inline BinaryOStream& operator<<( BinaryOStream &out, int64 x )
{
bw_netlonglong n;
n.i64 = x;
return out << n;
}
inline BinaryOStream& operator<<( BinaryOStream &out, char x )
{
bw_netbyte n;
n.c = x;
return out << n;
}
/**
* This method provides output streaming for a string.
*
* @param b The binary stream.
* @param str The string to be streamed.
*
* @return A reference to the stream that was passed in.
*/
inline BinaryOStream & operator<<( BinaryOStream & b, const BW::string & str )
{
b.appendString( str.data(), int(str.length()) );
return b;
}
/**
* This method provides output streaming for a vector.
*
* @param b The binary stream.
* @param data The vector to be streamed.
*
* @return A reference to the stream that was passed in.
*/
template <class T, class A>
inline BinaryOStream & operator<<( BinaryOStream & b,
const BW::vector<T,A> & data)
{
uint32 max = (uint32)data.size();
b << max;
for (uint32 i=0; i < max; i++)
b << data[i];
return b;
}
class Bundle : public BinaryOStream
{
// 省略类型的具体实现
};
不过这里没有提供对Map等复杂容器类型的支持,所以上面的CellAppInterface::addCell样例代码在序列化一个Map的时候,需要手动的序列化。下面就是一个手动序列化容器的实例,先序列化当前Map的大小,然后对内部的每个Pair执行逐个基础元素的序列化:
struct DataEntry
{
uint16 key;
BW::string data;
};
typedef BW::map< SpaceEntryID, DataEntry > DataEntries;
DataEntries dataEntries_;
{
// bundle.startMessage( CellAppInterface::allSpaceData );
// bundle << id_;
bundle << (uint32)dataEntries_.size();
DataEntries::const_iterator iter = dataEntries_.begin();
while (iter != dataEntries_.end())
{
bundle << iter->first <<
iter->second.key << iter->second.data;
++iter;
}
}
当逻辑层接收到这个RPC的时候,需要按照之前执行operator<<的顺序来执行operator>>,从而达到正确的参数解析:
/**
* This method reads space data from the input stream.
*/
void Space::readDataFromStream( BinaryIStream & stream )
{
int size;
stream >> size;
for (int i = 0; i < size; i++)
{
SpaceEntryID entryID;
uint16 key;
BW::string value;
stream >> entryID >> key >> value;
this->spaceDataEntry( entryID, key, value, DONT_UPDATE_CELL_APP_MGR );
}
}
解释了数据流是如何序列化与反序列化的之后,我们再来看一下Bundle里是如何执行封包逻辑的,这里只讲解UDPBundle,因为这个子类用的最多。封包逻辑入口是startMessage,参数为一个RPC的描述元数据InterfaceElement,以及是否是可靠消息的标记位,我们目前先默认全都是可靠的:
/**
* This method starts a new message on the bundle.
*
* @param ie The type of message to start.
* @param reliable True if the message should be reliable.
*/
void UDPBundle::startMessage( const InterfaceElement & ie,
ReliableType reliable )
{
// Piggybacks should only be added immediately before sending.
MF_ASSERT( !pCurrentPacket_->hasFlags( Packet::FLAG_HAS_PIGGYBACKS ) );
MF_ASSERT( ie.name() );
this->endMessage();
curIE_ = ie;
msgIsReliable_ = reliable.isReliable();
msgIsRequest_ = false;
isCritical_ = (reliable == RELIABLE_CRITICAL);
this->newMessage();
reliableDriver_ |= reliable.isDriver();
}
这里会先使用endMessage来完成之前的逻辑包的封装工作,然后使用newMessage来开启一个新逻辑包。endMessage的具体内容我们先不看,先看一下newMessage在干什么。函数开头先对包的一些统计信息进行修改,然后使用qreserve来预留相应大小的buffer来供后续的参数来填充:
/**
* This message begins a new message, with the given number of extra bytes in
* the header. These extra bytes are normally used for request information.
*
* @param extra Number of extra bytes to reserve.
* @return Pointer to the body of the message.
*/
char * UDPBundle::newMessage( int extra )
{
// figure the length of the header
int headerLen = curIE_.headerSize();
if (headerLen == -1)
{
CRITICAL_MSG( "Mercury::UDPBundle::newMessage: "
"tried to add a message with an unknown length format %d\n",
(int)curIE_.lengthStyle() );
}
++numMessages_;
if (msgIsReliable_)
{
++numReliableMessages_;
}
// make space for the header
MessageID * pHeader = (MessageID *)this->qreserve( headerLen + extra );
// set the start of this msg
msgBeg_ = (uint8*)pHeader;
msgChunkOffset_ = Packet::Offset( pCurrentPacket_->msgEndOffset() );
// write in the identifier
*(MessageID*)pHeader = curIE_.id();
// set the length to zero
msgLen_ = 0;
msgExtra_ = extra;
// and return a pointer to the extra data
return (char *)(pHeader + headerLen);
}
这里的pHeader就是当前消息的buffer开始地址,这里先往buffer的第一个字节填充进入当前消息的类型id,然后返回Header结束后的地址,这里预留buffer的接口是qreserve。之前在基类BinaryOStream上的所有operator<<最终都会通过reserve这个虚接口来将数据写入到buffer中, 在UDPBundle里这个虚接口的实现就是qreserve::
inline BinaryOStream& operator<<( BinaryOStream &out, bw_netlong x )
{
BW_STATIC_ASSERT( sizeof( bw_netlong ) == 4, bw_netlong_bad_size );
*(uint32*)out.reserve( sizeof( x ) ) = BW_HTONL( x.u32 );
return out;
}
inline BinaryOStream& operator<<( BinaryOStream &out, bw_netlonglong x )
{
BW_STATIC_ASSERT( sizeof( bw_netlonglong ) == 8, bw_netlonglong_bad_size );
*(uint64*)out.reserve( sizeof( x ) ) = BW_HTONLL( x.u64 );
return out;
}
/**
* This method reserves the given number of bytes in this bundle.
*/
INLINE void * UDPBundle::reserve( int nBytes )
{
return qreserve( nBytes );
}
所以这个qreserve就是序列化中最重要的函数,负责提供足够大的buffer来填入后续数据,我们来看看这个qreserve是如何执行动态内存分配的:
/**
* This method gets a pointer to this many bytes quickly
* (non-virtual function)
*/
INLINE void * UDPBundle::qreserve( int nBytes )
{
if (nBytes <= pCurrentPacket_->freeSpace())
{
void * writePosition = pCurrentPacket_->back();
pCurrentPacket_->grow( nBytes );
return writePosition;
}
else
{
return this->sreserve( nBytes );
}
}
从这里可以看出,Bundle的更底层组成单位为Packet,是作为一段连续内存buffer而存在的,pCurrentPacket_就是当前正在被使用的buffer。如果当前pCurrentPacket_的剩余空间不满足申请的大小,则使用sreserve来结束当前Packet,并新建一个Packet来作为新的buffer:
/**
* This function returns a pointer to nBytes on a bundle.
* It assumes that the data will not fit in the current packet,
* so it adds a new one. This is a private function.
*
* @param nBytes Number of bytes to reserve.
*
* @return Pointer to the reserved data.
*/
void * UDPBundle::sreserve( int nBytes )
{
this->endPacket( /* isExtending */ true );
this->startPacket( new Packet() );
void * writePosition = pCurrentPacket_->back();
pCurrentPacket_->grow( nBytes );
MF_ASSERT( pCurrentPacket_->freeSpace() >= 0 );
return writePosition;
}
Packet内部使用一个固定大小PACKET_MAX_SIZE的数组作为底层buffer,这里的Packet::back返回的是剩下还没有使用的内存开始地址,grow的作用就是将back后面的nBytes标记为已经使用,同时将back后移:
#define PACKET_MAX_SIZE 1472
/**
* All packets look like this. Only the data is actually sent;
* the rest is just housekeeping.
*
* @ingroup mercury
*/
class Packet : public ReferenceCount
{
public:
Packet();
~Packet();
Packet * next() { return next_.get(); }
const Packet * next() const { return next_.get(); }
void chain( Packet * pPacket ) { next_ = pPacket; }
public:
char * data() { return data_; }
const char * data() const { return data_; }
/// Returns a pointer to the start of the message data.
const char * body() const { return data_ + HEADER_SIZE; }
/// Returns a pointer to the end of the message data.
char * back() { return data_ + msgEndOffset_; }
int msgEndOffset() const { return msgEndOffset_; }
int bodySize() const { return msgEndOffset_ - HEADER_SIZE; }
int footerSize() const { return footerSize_; }
int totalSize() const { return msgEndOffset_ + footerSize_; }
void msgEndOffset( int offset ) { msgEndOffset_ = offset; }
void grow( int nBytes ) { msgEndOffset_ += nBytes; }
void shrink( int nBytes ) { msgEndOffset_ -= nBytes; }
int freeSpace() const
{
return MAX_SIZE -
RESERVED_FOOTER_SIZE -
msgEndOffset_ -
footerSize_ -
extraFilterSize_;
}
private:
/// Packets are linked together in a simple linked list fashion.
PacketPtr next_;
/// This the offset of the end of the headers and message data. It is
/// temporarily incorrect in two situations: when sending, it is incorrect
/// in NetworkInterface::send() whilst footers are being written, and when
/// receiving, it is incorrect until processOrderedPacket() strips the
/// fragment footers.
int msgEndOffset_;
/// The variable-length data follows the packet header in memory.
char data_[PACKET_MAX_SIZE];
};
然后从这个类型声明可以看出,连续的Packet是使用next_指针相连,组成一个单链表。因此在startPacket的时候,负责使用Packet::chain来将新的Packet拼接到当前pCurrentPacket_的后面,然后将pCurrentPacket_更新为当前新分配的packet:
/**
* This method starts a new packet in this bundle.
*/
void UDPBundle::startPacket( Packet * p )
{
Packet * prevPacket = pCurrentPacket_;
// Link the new packet into the chain if necessary.
if (prevPacket)
{
prevPacket->chain( p );
}
pCurrentPacket_ = p;
pCurrentPacket_->reserveFilterSpace( extraSize_ );
pCurrentPacket_->setFlags( 0 );
pCurrentPacket_->msgEndOffset( Packet::HEADER_SIZE );
// if we're in the middle of a message start the next chunk here
msgChunkOffset_ = pCurrentPacket_->msgEndOffset();
}
虽然我们在Packet::data里预留了PACKET_MAX_SIZE个字节,但是其实里面真正可用的比这个小,因为要扣除Packet::HEADER_SIZ, Packet::RESERVED_FOOTER_SIZE和FilterSpace等数据。总的来说这里会控制单个Packet的实际数据大小要小于1400,刚好是以太网的常见MTU大小。
如果当前要填充的数据很大,导致单一Packet放不下会怎么办?解决方式是使用多个Packet串联起来,例如这里的添加大量二进制数据的接口addBlob会使用循环来进行可能的多次Packet分配:
/**
* This convenience method is used to add a block of memory to this stream.
*/
INLINE
void UDPBundle::addBlob( const void * pBlob, int size )
{
const char * pCurr = (const char *)pBlob;
while (size > 0)
{
// If there isn't any more space on this packet, force a new one to be
// allocated to this bundle.
if (pCurrentPacket_->freeSpace() == 0)
{
this->sreserve( 0 );
}
int currSize = std::min( size, int( pCurrentPacket_->freeSpace() ) );
MF_ASSERT( currSize > 0 );
memcpy( this->qreserve( currSize ), pCurr, currSize );
size -= currSize;
pCurr += currSize;
}
}
了解了Packet是如何填充的之后,我们再来回顾一下每次StartPacket之前都需要先执行的endPacket,其实就是记录一些统计信息:
/**
* This method end processing of the current packet, i.e. calculate its
* flags, and the correct size including footers.
*
* @param isExtending True if we are extending the bundle size, false
* otherwise (when we are finalising for send).
*/
void UDPBundle::endPacket( bool isExtending )
{
// If this won't be the last packet, add a reliable order marker
if (isExtending)
{
if (this->isOnExternalChannel())
{
// add a partial reliable order if in the middle of a message
if (msgBeg_ != NULL && msgIsReliable_)
{
this->addReliableOrder();
}
// add a gap reliable order to mark the end of the packet
ReliableOrder rgap = { NULL, 0, 0 };
reliableOrders_.push_back( rgap );
}
}
// if we're in the middle of a message add this chunk
msgLen_ += pCurrentPacket_->msgEndOffset() - msgChunkOffset_;
msgChunkOffset_ = uint16( pCurrentPacket_->msgEndOffset() );
}
当一个逻辑消息包Message彻底填充结束的时候,会调用endMessage这个接口。在这个函数里会使用compressLength将当前Message的总长度msgLen_写入到当前消息字节流的开头,这部分空间在StartMessage的时候已经预留:
/**
* This method finalises a message. It is called from a number of places
* within Bundle when necessary.
*/
void UDPBundle::endMessage( bool isEarlyCall /* = false */ )
{
// nothing to do if no message yet
if (msgBeg_ == NULL)
{
MF_ASSERT( pCurrentPacket_->msgEndOffset() == Packet::HEADER_SIZE ||
hasEndedMsgEarly_ );
return;
}
// add the amt used in this packet to the length
msgLen_ += pCurrentPacket_->msgEndOffset() - msgChunkOffset_;
// fill in headers for this msg
curIE_.compressLength( msgBeg_, msgLen_, this, msgIsRequest_ );
// record its details if it was reliable
if (msgIsReliable_)
{
if (this->isOnExternalChannel())
{
this->addReliableOrder();
}
msgIsReliable_ = false; // for sanity
}
msgChunkOffset_ = Packet::Offset( pCurrentPacket_->msgEndOffset() );
msgBeg_ = NULL;
msgIsRequest_ = false;
hasEndedMsgEarly_ = isEarlyCall;
}
这里的compressLength逻辑并不是使用一个固定的uint32_t的空间来填入当前消息包的总长度,而是使用了变长编码。他根据当前RPC接口所携带的长度信息来做这样的处理:
- 如果当前
RPC是参数大小固定的RPC,则不需要在包开头预留长度字段,无需填充 - 如果当前
RPC是参数大小可变的RPC,则根据预设的参数最大长度的字节大小lengthParam_来在包开头预留对应的空间,等一个包所有参数彻底填入之后,再在开头预留的空间里将这个长度字段填进入 - 如果当前
RPC是参数大小可变的RPC,但是当前参数的大小超过了之前预设的lengthParam_所能表示的整数范围,则将预先保留的大小区域全都填充为0xff,然后在bundle的末尾再加入一个int32,写入当前包大小之后,再与第一个packet里的头四个字节做调换,这样长度字段依然在message的开头部分,这部分对应的代码见下:
// If the message length could not fit into a standard length field, we
// need to handle this as a special case.
if (oversize)
{
// Fill the original length field with ones to indicate the special
// situation.
static const int IDENTIFIER_SIZE = sizeof(uint8);
for (int i = IDENTIFIER_SIZE; i <= lengthParam_; ++i)
{
((uint8*)data)[i] = 0xff;
}
if (pBundle)
{
void * tail = pBundle->reserve( sizeof( int32 ) );
void * ret = this->specialCompressLength( data, length,
pBundle->pFirstPacket(), isRequest );
MF_ASSERT( !ret || tail == ret );
return ret ? 0 : -1;
}
else
{
return -1;
}
}
/**
* This method is called by InterfaceElement::compressLength when the amount
* of data added to the stream for the message is more than the message's size
* field can handle. For example, if lengthParam is 1 and there is at least
* 255 bytes worth of data added for the message (or 65535 for 2 bytes etc).
*
* To handle this, a 4-byte size is placed at the start of the message
* displacing the first four bytes of the message. These are appended to the
* end of the message. The original length field is filled with 0xff to
* indicate this special situation.
*/
void * InterfaceElement::specialCompressLength( void * data, int length,
Packet * pPacket, bool isRequest ) const;
对应这样的长度填充函数compressLength,InterfaceElement也提供了对应的长度解析函数expandLength,就是当前填充过程的逆过程:
/**
* This method expands a length from the given header.
*
* @param data This is a pointer to a message header.
* @param pPacket
* @param isRequest
*
* @return Expanded length.
*/
int InterfaceElement::expandLength( void * data, Packet * pPacket,
bool isRequest ) const
{
switch (lengthStyle_)
{
case FIXED_LENGTH_MESSAGE:
return lengthParam_;
break;
case VARIABLE_LENGTH_MESSAGE:
{
uint8 *pLen = ((uint8*)data) + sizeof( MessageID );
uint32 len = 0;
switch (lengthParam_)
{
case 0: len = 0; break;
case 1: len = *(uint8*)pLen; break;
case 2:
{
#if defined( BW_ENFORCE_ALIGNED_ACCESS )
uint16 len16 = 0;
memcpy( &len16, pLen, sizeof(uint16) );
len = BW_NTOHS( len16 );
#else // !defined( BW_ENFORCE_ALIGNED_ACCESS )
len = BW_NTOHS( *(uint16 *)pLen );
#endif // defined( BW_ENFORCE_ALIGNED_ACCESS )
break;
}
case 3: len = BW_UNPACK3( (const char*)pLen ); break;
case 4:
{
#if defined( BW_ENFORCE_ALIGNED_ACCESS )
uint32 len32;
memcpy( &len32, pLen, sizeof(uint32) );
len = BW_NTOHL( len32 );
#else // !defined( BW_ENFORCE_ALIGNED_ACCESS )
len = BW_NTOHL( *(uint32*)pLen );
#endif // defined( BW_ENFORCE_ALIGNED_ACCESS )
break;
}
default:
CRITICAL_MSG( "InterfaceElement::expandLength( %s ): "
"Unhandled variable message length: %d\n",
this->c_str(), lengthParam_ );
}
// If lengthParam_ is 4, a length > 0x80000000 will cause an overflow
// and a negative value will be returned from this method.
if ((int)len < 0)
{
ERROR_MSG( "Mercury::InterfaceElement::expandLength( %s ): "
"Overflow in calculating length of variable message!\n",
this->c_str() );
return -1;
}
// The special case is indicated with the length field set to maximum.
// i.e. All bits set to 1.
if (!this->canHandleLength( len ))
{
if (!pPacket)
{
return -1;
}
return this->specialExpandLength( data, pPacket, isRequest );
}
return len;
break;
}
default:
ERROR_MSG( "Mercury::InterfaceElement::expandLength( %s ): "
"unrecognised length format %d\n",
this->c_str(), (int)lengthStyle_ );
break;
}
return -1;
}
综上, bundle里的最小组成单元其实是packet,packet之间使用单链表来串联。每个packet的预留大小都是一样的,基本等于ip的最常见MTU 1400。然后每个Message的所有数据都放在packet里,每个Message的开头都会写入一个单字节整数代表当前Message的唯一标识符。在标识符后面就是当前Message的参数总长度信息,这是一个可变字节大小的整数,字节大小由这个Message的类型确定。如果消息太长,则会走上面的oversize逻辑来填入长度信息。
Bundle的发送与接收
Message写入之后并不会立即触发底层的网络发送,网络发送的逻辑由Channel::send托管,一般是每帧末尾被自动调用,也可以手动调用:
/**
* This method sends the given bundle on this channel. If no bundle is
* supplied, the channel's own bundle will be sent.
*
* @param pBundle The bundle to send, or NULL to send the channel's own
* bundle.
*/
void Channel::send( Bundle * pBundle /* = NULL */ )
{
ChannelPtr pChannel( this );
if (!this->isConnected())
{
ERROR_MSG( "Channel::send( %s ): Channel is not connected\n",
this->c_str() );
return;
}
if (pBundle == NULL)
{
pBundle = pBundle_;
}
this->doPreFinaliseBundle( *pBundle );
pBundle->finalise();
this->networkInterface().addReplyOrdersTo( *pBundle, this );
this->doSend( *pBundle );
// Clear the bundle
if (pBundle == pBundle_)
{
this->clearBundle();
}
else
{
pBundle->clear();
}
if (pListener_)
{
pListener_->onChannelSend( *this );
}
}
这里的核心就是doSend函数,内部会做一堆错误检查,最后没有问题的情况下才会调用到networkInterface::send:
/*
* Override from Channel.
*/
void UDPChannel::doSend( Bundle & bundleUncast )
{
MF_ASSERT_DEBUG( dynamic_cast< UDPBundle * >( &bundleUncast ) != NULL );
UDPBundle * pBundle = static_cast< UDPBundle * >( &bundleUncast );
// 忽略很多错误处理函数
// Send the bundle through the network interface as UDP packets
pNetworkInterface_->send( addr_, *pBundle, this );
// Update our stats
++numDataUnitsSent_;
numBytesSent_ += pBundle->size();
if (pBundle->isReliable())
{
++numReliablePacketsSent_;
}
// Channels that do not send regularly are added to a collection to do
// their resend checking periodically.
pNetworkInterface_->irregularChannels().addIfNecessary( *this );
// If the bundle that was just sent was critical, the sequence number of
// its last packet is the new unackedCriticalSeq_.
if (pBundle->isCritical())
{
unackedCriticalSeq_ =
pBundle->pFirstPacket()->seq() + pBundle->numDataUnits() - 1;
}
}
/**
* This method sends a bundle to the given address.
*
* Note: any pointers you have into the packet may become invalid after this
* call (and whenever a channel calls this too).
*
* @param address The address to send to.
* @param bundle The bundle to send
* @param pChannel The Channel that is sending the bundle.
* (even if the bundle is not sent reliably, it is still passed
* through the filter associated with the channel).
*/
void NetworkInterface::send( const Address & address,
UDPBundle & bundle, UDPChannel * pChannel )
{
pPacketSender_->send( address, bundle, pChannel );
}
这里有一个专门的PacketSender来负责组包并投递到底层的socket里:
/**
* This method sends a bundle to the given address.
*
* Note: any pointers you have into the packet may become invalid after this
* call (and whenever a channel calls this too).
*
* @param address The address to send to.
* @param bundle The bundle to send
* @param pChannel The Channel that is sending the bundle.
* (even if the bundle is not sent reliably, it is still passed
* through the filter associated with the channel).
*/
void PacketSender::send( const Address & address,
UDPBundle & bundle, UDPChannel * pChannel )
{
MF_ASSERT( address != Address::NONE );
MF_ASSERT( !pChannel || pChannel->addr() == address );
MF_ASSERT( !bundle.pChannel() || (bundle.pChannel() == pChannel) );
#if ENABLE_WATCHERS
sendingStats_.mercuryTimer().start();
#endif // ENABLE_WATCHERS
if (!bundle.isFinalised())
{
// Handle bundles sent off-channel that won't have been finalised by
// their channels yet.
bundle.finalise();
bundle.addReplyOrdersTo( &requestManager_, pChannel );
}
// fill in all the footers that are left to us
Packet * pFirstOverflowPacket = bundle.preparePackets( pChannel,
seqNumAllocator_, sendingStats_, shouldUseChecksums_ );
// Finally actually send the darned thing. Do not send overflow packets.
for (Packet * pPacket = bundle.pFirstPacket();
pPacket != pFirstOverflowPacket;
pPacket = pPacket->next())
{
this->sendPacket( address, pPacket, pChannel, false );
}
#if ENABLE_WATCHERS
sendingStats_.mercuryTimer().stop( 1 );
#endif // ENABLE_WATCHERS
sendingStats_.numBundlesSent_++;
sendingStats_.numMessagesSent_ += bundle.numMessages();
sendingStats_.numReliableMessagesSent_ += bundle.numReliableMessages();
}
在这里我们看到有一个for循环,在循环里会把当前单链表里的Packet依次的调用sendPacket来投入到socket里。但是这里有一个限制,不能单词send调用发送太多数据,因此会使用bundle.preparePackets来计算出最多能发送到的Packet记录为pFirstOverflowPacket,循环遍历的时候遇到这个Packet就结束。同时preparePackets还会将当前Channel的唯一标识符(其实就是EntityID)也加入到Packet里:
/**
* This method will write the flags on a packet fitting for one that will ride
* on this channel. It will also reserve enough space for the footer.
*/
void UDPChannel::writeFlags( Packet * p )
{
p->enableFlags( Packet::FLAG_ON_CHANNEL );
if (this->isIndexed())
{
p->enableFlags( Packet::FLAG_INDEXED_CHANNEL );
p->channelID() = id_;
p->channelVersion() = version_;
p->reserveFooter( sizeof( ChannelID ) + sizeof( ChannelVersion ) );
}
// 省略很多代码
}
这里的sendPacket还是过于上层,会有很多的统计和中转过滤操作,并最终调用到basicSendWithRetries:
/**
* This method sends a packet. No result is returned as it cannot be trusted.
* The packet may never get to the other end.
*
* @param address The destination address.
* @param pPacket The packet to send.
* @param pChannel The channel to be sent on, or NULL if off-channel.
* @param isResend If true, this is a resend, otherwise, it is the initial
* send.
*/
void PacketSender::sendPacket( const Address & address,
Packet * pPacket,
UDPChannel * pChannel, bool isResend )
{
// 忽略一些无关代码
// Check if we want artificial loss or latency
if (!this->rescheduleSend( address, pPacket, pChannel ))
{
this->sendRescheduledPacket( address, pPacket, pChannel );
}
}
/**
* This method sends the packet after rescheduling has occurred.
*
* @param address The destination address.
* @param pPacket The packet.
* @param pChannel The channel, or NULL if off-channel send.
*/
void PacketSender::sendRescheduledPacket( const Address & address,
Packet * pPacket,
UDPChannel * pChannel )
{
PacketFilterPtr pFilter = pChannel ? pChannel->pFilter() : NULL;
if (pPacketMonitor_)
{
pPacketMonitor_->packetOut( address, *pPacket );
}
// Otherwise send as appropriate
if (pFilter)
{
pFilter->send( *this, address, pPacket );
}
else
{
this->basicSendWithRetries( address, pPacket );
}
}
这里的basicSendWithRetries负责封装底层socket的发送,真正的往指定地址尝试投递一个包,如果投递失败还负责短暂的重试:
/**
* Basic packet sending functionality that retries a few times
* if there are transitory errors.
*
* @param addr The destination address.
* @param pPacket The packet to send.
*
* @return REASON_SUCCESS on success, otherwise an appropriate
* Mercury::Reason.
*/
Reason PacketSender::basicSendWithRetries( const Address & addr,
Packet * pPacket )
{
// try sending a few times
int retries = 0;
Reason reason;
while (retries <= 3)
{
++retries;
#if ENABLE_WATCHERS
sendingStats_.systemTimer().start();
#endif // ENABLE_WATCHERS
reason = this->basicSendSingleTry( addr, pPacket );
#if ENABLE_WATCHERS
sendingStats_.systemTimer().stop( 1 );
#endif // ENABLE_WATCHERS
if (reason == REASON_SUCCESS)
return reason;
// If we've got an error in the queue simply send it again;
// we'll pick up the error later.
if (reason == REASON_NO_SUCH_PORT)
{
continue;
}
// If the transmit queue is full wait 10ms for it to empty.
if ((reason == REASON_RESOURCE_UNAVAILABLE) ||
(reason == REASON_TRANSMIT_QUEUE_FULL))
{
// 一些容错代码
continue;
}
// some other error, so don't bother retrying
break;
}
return reason;
}
最后的basicSendSingleTry才有机会接触到裸的socket,将当前packet里的数据发送到指定socket里,至此发送流程终于走完了:
/**
* Basic packet sending function that just tries to send once.
*
* @param addr The destination address.
* @param pPacket The packet to send.
*
* @return REASON_SUCCESS on success otherwise an appropriate
* Mercury::Reason.
*/
Reason PacketSender::basicSendSingleTry( const Address & addr,
Packet * pPacket )
{
int len = socket_.sendto( pPacket->data(), pPacket->totalSize(),
addr.port, addr.ip );
if (len == pPacket->totalSize())
{
sendingStats_.numBytesSent_ += len + UDP_OVERHEAD;
sendingStats_.numPacketsSent_++;
return REASON_SUCCESS;
}
// 省略错误处理代码
}
Packet的发送由PacketSender托管, 类似的Packet的接收也有一个专门的类型PacketReceiver托管。不过收消息逻辑上层还有很多封装,下面就是一个典型的收消息调用栈,:
WorldOfWarplanes.exe!Mercury::PacketReceiver::processFilteredPacket(const Mercury::Address & addr={...}, Mercury::Packet * p=0x243399c0, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4) Line 347 + 0x11 bytes C++
WorldOfWarplanes.exe!Mercury::PacketFilter::recv(Mercury::PacketReceiver & receiver={...}, const Mercury::Address & addr={...}, Mercury::Packet * pPacket=0x243399c0, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4) Line 38 C++
WorldOfWarplanes.exe!Mercury::EncryptionFilter::recv(Mercury::PacketReceiver & receiver={...}, const Mercury::Address & addr={...}, Mercury::Packet * pPacket=0x243399c0, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4) Line 233 + 0x1c bytes C++
WorldOfWarplanes.exe!Mercury::PacketReceiver::processPacket(const Mercury::Address & addr={...}, Mercury::Packet * p=0x0493cc40, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4) Line 242 + 0x25 bytes C++
WorldOfWarplanes.exe!Mercury::PacketReceiver::processSocket(bool expectingPacket=true) Line 92 C++
WorldOfWarplanes.exe!Mercury::PacketReceiver::handleInputNotification(int fd=684) Line 51 + 0x9 bytes C++
WorldOfWarplanes.exe!Mercury::SelectPoller::handleInputNotifications(int & countReady=, fd_set & readFDs={...}, fd_set & writeFDs={...}) Line 305 + 0x29 bytes C++
WorldOfWarplanes.exe!Mercury::SelectPoller::processPendingEvents(double maxWait=0.00000000000000000) Line 398 + 0x19 bytes C++
WorldOfWarplanes.exe!Mercury::DispatcherCoupling::doTask() Line 34 + 0x38 bytes C++
WorldOfWarplanes.exe!Mercury::FrequentTasks::process() Line 112 C++
WorldOfWarplanes.exe!Mercury::EventDispatcher::processOnce(bool shouldIdle=false) Line 381 C++
WorldOfWarplanes.exe!ServerConnection::processInput() Line 922 C++
在PacketReceiver::handleInputNotification(int fd=684)上层的逻辑都是一些epoll的封装,这些封装代码我们这里就跳过。当单个socket可以读取数据的时候才会调用到PacketReceiver::handleInputNotification,这里会不断的调用processSocket来接收一个Packet,直到socket里的剩余可读数据无法组成一个Packet:
/*
* This method is called when there is data on the socket.
*/
int PacketReceiver::handleInputNotification( int fd )
{
uint64 processingStartStamps = BW_NAMESPACE timestamp();
int numPacketsProcessed = 0;
bool expectingPacket = true; // only true for first call to processSocket()
bool shouldProcess = true;
while (shouldProcess)
{
Address sourceAddress;
shouldProcess = this->processSocket( sourceAddress, expectingPacket );
expectingPacket = false;
uint64 processingElapsedStamps =
BW_NAMESPACE timestamp() - processingStartStamps;
++numPacketsProcessed;
// 省略一些容错代码
}
return 0;
}
这个processSocket会通过Packet::recvFromEndpoint来尝试从socket里读取数据来填充当前Packet:
/**
* This method will read and process any pending data on this object's socket.
*
* @param srcAddr This will be filled with the source address of any
* packets received.
* @param expectingPacket If true, a packet was expected to be read,
* otherwise false.
*
* @return True if a packet was read, otherwise false.
*/
bool PacketReceiver::processSocket( Address & srcAddr,
bool expectingPacket )
{
stats_.updateSocketStats( socket_ );
// Used to collect stats
ProcessSocketStatsHelper statsHelper( stats_ );
// try a recvfrom
int len = pNextPacket_->recvFromEndpoint( socket_, srcAddr );
statsHelper.socketReadFinished( len );
if (len <= 0)
{
this->checkSocketErrors( len, expectingPacket );
return false;
}
// process it if it succeeded
PacketPtr curPacket = pNextPacket_;
pNextPacket_ = new Packet();
Reason ret = this->processPacket( srcAddr, curPacket.get(),
&statsHelper );
if ((ret != REASON_SUCCESS) &&
networkInterface_.isVerbose())
{
this->dispatcher().errorReporter().reportException( ret, srcAddr );
}
return true;
}
Packet::recvFromEndpoint每次尝试读取最多PACKET_MAX_SIZE=1472个字节的数据到内部的读取缓冲区内,这个PACKET_MAX_SIZE也就是当前单一Packet的数据大小上限:
// The default max size for a packet is the MTU of an ethernet frame, minus the
// overhead of IP and UDP headers. If you have special requirements for packet
// sizes (e.g. your client/server connection is running over VPN) you can edit
// this to whatever you need.
const int Packet::MAX_SIZE = PACKET_MAX_SIZE;
/**
* This method does a recv on the endpoint into this packet's data array,
* setting the length correctly on a successful receive. The return value is
* the return value from the low-level recv() call.
*/
int Packet::recvFromEndpoint( Endpoint & ep, Address & addr )
{
int len = ep.recvfrom( data_, MAX_SIZE,
(u_int16_t*)&addr.port, (u_int32_t*)&addr.ip );
if (len >= 0)
{
this->msgEndOffset( len );
}
return len;
}
这里需要重新学习一下UDP的数据接收,他不像TCP那样的字节流接收,而是以包为单位来接收,当一个UDP socket变的可读的时候说明其内部已经接收了一个完整的包。业务层recvfrom接收udp包的时候需要传入buf和bufsize,就是接收空间和接收空间大小。如果这个bufsize小于udp包的大小,那么只能接收到这个udp包的前bufsize个字节,剩下的部分会被直接被丢弃,再次执行recvfrom的时候处理的已经是第二个包了。所以bufsize要适配组包时的单Packet大小上限,也就是PACKET_MAX_SIZE。
所以当recvFromEndpoint返回的len大于0的时候,就代表接收到了一个Packet。由于我们业务层使用的是Message,所以接下来我们需要尝试从Packet里恢复出Message:
/**
* This is the entrypoint for new packets, which just gives it to the filter.
*/
Reason PacketReceiver::processPacket( const Address & addr, Packet * p,
ProcessSocketStatsHelper * pStatsHelper )
{
// 跳过一些分支处理
//parse raw.
return this->processFilteredPacket( addr, p, pStatsHelper );
}
这个processFilteredPacket的内部逻辑太长了,我们这里就跳过,只需要知道的是这里会根据传入的地址addr与当前PacketReceiver绑定的RPC Interface以及Packet负载的ChannelID来查找对应的Channel pChannel,找到之后执行addToReceiveWindow函数:
// 省略很多代码
// should we be looking in a channel
if (pChannel)
{
UDPChannel::AddToReceiveWindowResult result =
pChannel->addToReceiveWindow( p, addr, stats_ );
}
// 省略很多代码
由于udp的Packet可能会被乱序接收,所以这里的addToReceiveWindow的作用就是将接受到的Packet按照其编号进行排序,如果当前接收到的Packet的序号与所期待的Packet的序号相匹配,则会在processFilteredPacket里处理一些若干个连续的Packet:
Reason oret = REASON_SUCCESS;
PacketPtr pCurrPacket = p;
PacketPtr pNextPacket = NULL;
// push this packet chain (frequently one) through processOrderedPacket
// NOTE: We check isCondemned on the channel and not isDead. If a channel
// has isDestroyed set to true but isCondemned false, we still want to
// process remaining messages. This can occur if there is a message that
// causes the entity to teleport. Any remaining messages are still
// processed and will likely be forwarded from the ghost entity to the
// recently teleported real entity.
// TODO: It would be nice to display a message if the channel is condemned
// but there are messages on it.
while (pCurrPacket &&
((pChannel == NULL) || !pChannel->isCondemned()))
{
// processOrderedPacket expects packets not to be chained, since
// chaining is used for grouping fragments into bundles. The packet
// chain we've set up doesn't have anything to do with bundles, so we
// break the packet linkage before passing the packets into
// processOrderedPacket. This can mean that packets that aren't the one
// just received drop their last reference, hence the need for
// pCurrPacket and pNextPacket.
pNextPacket = pCurrPacket->next();
pCurrPacket->chain( NULL );
// Make sure they are actually packets with consecutive sequences.
MF_ASSERT( pNextPacket.get() == NULL ||
seqMask( pCurrPacket->seq() + 1 ) ==
pNextPacket->seq() );
// At this point, the only footers left on the packet should be the
// request and fragment footers.
Reason ret = this->processOrderedPacket( addr, pCurrPacket.get(),
pChannel.get(), pStatsHelper );
if (oret == REASON_SUCCESS)
{
oret = ret;
}
pCurrPacket = pNextPacket;
}
这里使用while来消耗掉已经接收到的多个连续的Packet,每个Packet都会经过processOrderedPacket的处理。由于单Bundle可能是由多个packet组合而成的,同时一个Packet里可能会有多个Bundle的数据,所以这里使用一个比Packet更小粒度的概念FragmentedBundle来代表单一Bundle里的多个连续区块, 其概念上类似于绑定了一个shared_ptr<Bundle>的string_view。然后完整的Bundle就由一个FragmentedBundle的单链表来组成。当单一bundle的所有packet都到达之后,这里会使用UDPBundleProcessor::dispatchMessages来将当前完整的bundle进行消息回调:
/**
* Process a packet after any ordering guaranteed by reliable channels
* has been imposed (further ordering guaranteed by fragmented bundles
* is still to be imposed)
*/
Reason PacketReceiver::processOrderedPacket( const Address & addr, Packet * p,
UDPChannel * pChannel, ProcessSocketStatsHelper * pStatsHelper )
{
// 省略很多代码
// We have a complete packet chain. We can drop the reference in pChain now
// since the Bundle owns it.
UDPBundleProcessor outputBundle( p );
pChain = NULL;
Reason reason = outputBundle.dispatchMessages(
networkInterface_.interfaceTable(),
addr,
pChannel,
networkInterface_,
pStatsHelper );
if (reason == REASON_CORRUPTED_PACKET)
{
RETURN_FOR_CORRUPTED_PACKET();
}
}
由于一个bundle里可能会有多个Message,所以在这个dispatchMessages里会使用一个循环来处理内部存储的所有消息:
/**
* This method is responsible for dispatching the messages on this bundle to
* the appropriate handlers.
*
* @param interfaceTable The interface table.
* @param addr The source address of the bundle.
* @param pChannel The channel.
* @param networkInterface The network interface.
* @param pStatsHelper The socket receive statistics.
*
* @return REASON_SUCCESS on success, otherwise an appropriate
* Mercury::Reason describing the error.
*/
Reason UDPBundleProcessor::dispatchMessages( InterfaceTable & interfaceTable,
const Address & addr, UDPChannel * pChannel,
NetworkInterface & networkInterface,
ProcessSocketStatsHelper * pStatsHelper ) const
{
# define SOURCE_STR (pChannel ? pChannel->c_str() : addr.c_str())
bool breakLoop = pChannel ? pChannel->isDead() : false;
Reason ret = REASON_SUCCESS;
// NOTE: The channel may be destroyed while processing the messages so we
// need to hold a local reference to keep pChannel valid.
ChannelPtr pChannelHolder = pChannel;
MessageFilterPtr pMessageFilter =
pChannel ? pChannel->pMessageFilter() : NULL;
// now we simply iterate over the messages in that bundle
iterator iter = this->begin();
iterator end = this->end();
interfaceTable.onBundleStarted( pChannel );
while (iter != end && !breakLoop)
{
// find out what this message looks like
InterfaceElementWithStats & ie = interfaceTable[ iter.msgID() ];
// 省略一些代码代码
InterfaceElement updatedIE = ie;
if (!updatedIE.updateLengthDetails( networkInterface, addr ))
{
ERROR_MSG( "UDPBundleProcessor::dispatchMessages( %s ): "
"Discarding bundle after failure to update length "
"details for message ID %hu\n",
SOURCE_STR, (unsigned short int)iter.msgID() );
ret = REASON_CORRUPTED_PACKET;
break;
}
// get the details out of it
UnpackedMessageHeader & header = iter.unpack( updatedIE );
// 省略一些代码
}
// 省略很多代码
}
在处理单个消息的解析的时候,首先要做的就是获取当前消息的参数长度,这个逻辑在iter.unpack函数中:
/**
* This method unpacks the current message using the given
* interface element.
*
* @param ie InterfaceElement for the current message.
*
* @return Header describing the current message.
*/
UnpackedMessageHeader & UDPBundleProcessor::iterator::unpack(
const InterfaceElement & ie )
{
uint16 msgBeg = offset_;
MF_ASSERT( !isUnpacked_ );
bool isRequest = (nextRequestOffset_ == offset_);
updatedIE_ = ie;
// read the standard header
if (int(offset_) + updatedIE_.headerSize() > int(bodyEndOffset_))
{
ERROR_MSG( "UDPBundleProcessor::iterator::unpack( %s ): "
"Not enough data on stream at %hu for header "
"(%d bytes, needed %d)\n",
updatedIE_.name(), offset_, int(bodyEndOffset_) - int(offset_),
updatedIE_.headerSize() );
goto errorNoRevert;
}
curHeader_.identifier = this->msgID();
curHeader_.length =
updatedIE_.expandLength( cursor_->data() + msgBeg, cursor_, isRequest );
// 省略很多代码
}
在UDPBundleProcessor::iterator::unpack我们终于见到了之前介绍过的compressLength的逆过程expandLength,在执行完成expandlength之后,当前消息的总长度就存储在curHeader_.length里。有了总长度之后,就可以方便的知道后续哪些packet里有当前message的消息,当所有packet都收集完成了之后,下面的这个函数就会将分散在各个packet里的参数数据进行合并,成为一个单一的连续buffer:
/**
* This method returns the data for the message that the iterator
* is currently pointing to.
*
* @return Pointer to message data.
*/
const char * UDPBundleProcessor::iterator::data()
{
// does this message go off the end of the packet?
if (dataOffset_ + dataLength_ <= bodyEndOffset_)
{
// no, ok, we're safe
return cursor_->data() + dataOffset_;
}
// is there another packet? assert that there is because 'unpack' would have
// flagged an error if the next packet was required but missing
MF_ASSERT( cursor_->next() != NULL );
if (cursor_->next() == NULL) return NULL;
// also assert that data does not start mid-way into the next packet
MF_ASSERT( dataOffset_ <= bodyEndOffset_ ); // (also implied by 'unpack')
// is the entirety of the message data on the next packet?
if (dataOffset_ == bodyEndOffset_ &&
Packet::HEADER_SIZE + dataLength_ <= cursor_->next()->msgEndOffset())
{
// yes, easy then
return cursor_->next()->body();
}
// ok, it's half here and half there, time to make a temporary buffer.
// note that a better idea might be to return a stream from this function.
if (dataBuffer_ != NULL)
{
// Already created a buffer for it.
return dataBuffer_;
}
// Buffer is destroyed in operator++() and in ~iterator().
dataBuffer_ = new char[dataLength_];
Packet *thisPack = cursor_;
uint16 thisOff = dataOffset_;
uint16 thisLen;
for (int len = 0; len < dataLength_; len += thisLen)
{
if (thisPack == NULL)
{
DEBUG_MSG( "UDPBundleProcessor::iterator::data: "
"Run out of packets after %d of %d bytes put in temp\n",
len, dataLength_ );
return NULL;
}
thisLen = thisPack->msgEndOffset() - thisOff;
if (thisLen > dataLength_ - len) thisLen = dataLength_ - len;
memcpy( dataBuffer_ + len, thisPack->data() + thisOff, thisLen );
thisPack = thisPack->next();
thisOff = Packet::HEADER_SIZE;
}
return dataBuffer_;
}
这里的dataBuffer_就是最终的参数数据的连续buffer,通过这个连续buffer可以构造BinaryIStream,通过operator>>的方式来解析出原来的所有参数。
Channel与可靠UDP
在本书的开头章节中我们介绍过UDP不是一个业务层能直接使用的协议,因为UDP数据包在发送、传输、接收过程中有各种理由能导致数据丢失和乱序。为了能让业务层能够相信网络功能与TCP一样能够提供可靠有序的消息消息收发,网络框架这一层必须提供一个可靠UDP的实现,来抹平底层的网络协议差异,典型样例便是KCP协议。在BigWorld中,业务通信主要使用的都是UDP,因此其网络层提供了一个自己实现的可靠UDP, 叫做UDPChannel:
/**
* Channels are used for regular communication channels between two address.
*
* @note Any time you call 'bundle' you may get a different bundle to the one
* you got last time, because the Channel decided that the bundle was full
* enough to send. This does not occur on high latency channels (or else
* tracking numbers would get very confusing).
*
* @note If you use more than one Channel on the same address, they share the
* same bundle. This means that:
*
* @li Messages (and message sequences where used) must be complete between
* calls to 'bundle' (necessary due to note above anyway)
*
* @li Each channel must say send before the bundle is actually sent.
*
* @li Bundle tracking does not work with multiple channels; only the last
* Channel to call 'send' receives a non-zero tracking number (or possibly
* none if deleting a Channel causes it to be sent), and only the first
* Channel on that address receives the 'bundleLost' call.
*
* @ingroup mercury
*/
class UDPChannel : public Channel
{
// 省略很多代码
};
UDPChannel的父类Channel就是一个逻辑上的数据收发通道,这个Channel类负责提供基于Bundle的发送接收的虚接口。Channel有一个另外的子类TCPChannel来处理基于TCP的数据收发,由于TCP已经是一个可靠协议,所以TCPChannel的逻辑并不多,直接把Bundle加上长度字段即可塞入到发送队列中。而UDPChannel则需要在底层将一个Bundle的数据拆分为多个Packet,并依次发送到消息的对端,因为如果单个消息包太大超过以太网的常见MTU的话,会导致拆包,从而增大了丢包和乱序的风险。UDPChannel对端接收数据的时候也需要使用UDPChannel,只有两端都是UDPChannel的时候才能使用框架提供的能力来实现可靠传输。接下来我们来大致的分析一下这个UDPChannel是如何实现可靠传输的。
在UDP上做可靠传输,首先需要为发出的每个包分配一个连续自增的编号,在Packet上也提供了这个字段:
/// Sequence number, or Channel::SEQ_NULL if not set
SeqNum seq_;
这个字段在Packet刚被创建的时候是默认为0的,只有在PacketSender::send执行的时候才会通过UDPBundle::preparePackets来执行填充:
/**
* This method sends a bundle to the given address.
*
* Note: any pointers you have into the packet may become invalid after this
* call (and whenever a channel calls this too).
*
* @param address The address to send to.
* @param bundle The bundle to send
* @param pChannel The Channel that is sending the bundle.
* (even if the bundle is not sent reliably, it is still passed
* through the filter associated with the channel).
*/
void PacketSender::send( const Address & address,
UDPBundle & bundle, UDPChannel * pChannel )
{
MF_ASSERT( address != Address::NONE );
MF_ASSERT( !pChannel || pChannel->addr() == address );
MF_ASSERT( !bundle.pChannel() || (bundle.pChannel() == pChannel) );
#if ENABLE_WATCHERS
sendingStats_.mercuryTimer().start();
#endif // ENABLE_WATCHERS
if (!bundle.isFinalised())
{
// Handle bundles sent off-channel that won't have been finalised by
// their channels yet.
bundle.finalise();
bundle.addReplyOrdersTo( &requestManager_, pChannel );
}
// fill in all the footers that are left to us
Packet * pFirstOverflowPacket = bundle.preparePackets( pChannel,
seqNumAllocator_, sendingStats_, shouldUseChecksums_ );
// 省略后续的往socket投递的逻辑
}
这个函数有点长,目前我们先关注关于序列号分配部分的逻辑:
/**
* This method prepares packets this bundle for sending.
*
* @param pChannel The channel, or NULL for off-channel sending.
* @param seqNumAllocator The network interface's sequence number allocator,
* used for off-channel sending.
* @param sendingStats The sending stats to update.
*/
Packet * UDPBundle::preparePackets( UDPChannel * pChannel,
SeqNumAllocator & seqNumAllocator,
SendingStats & sendingStats,
bool shouldUseChecksums )
{
// fill in all the footers that are left to us
Packet * pFirstOverflowPacket = NULL;
int numPackets = this->numDataUnits();
SeqNum firstSeq = 0;
SeqNum lastSeq = 0;
// Write footers for each packet.
for (Packet * pPacket = this->pFirstPacket();
pPacket;
pPacket = pPacket->next())
{
MF_ASSERT( pPacket->msgEndOffset() >= Packet::HEADER_SIZE );
// 省略很多代码
this->writeFlags( pPacket );
if (pChannel)
{
pChannel->writeFlags( pPacket );
}
if ((pChannel && pChannel->isExternal()) ||
pPacket->hasFlags( Packet::FLAG_IS_RELIABLE ) ||
pPacket->hasFlags( Packet::FLAG_IS_FRAGMENT ))
{
pPacket->reserveFooter( sizeof( SeqNum ) );
pPacket->enableFlags( Packet::FLAG_HAS_SEQUENCE_NUMBER );
}
// At this point, pPacket->back() is positioned just after the message
// data, so we advance it to the end of where the footers end, then
// write backwards towards the message data. We check that we finish
// up back at the message data as a sanity check.
const int msgEndOffset = pPacket->msgEndOffset();
pPacket->grow( pPacket->footerSize() );
// 省略很多代码
// Add the sequence number
if (pPacket->hasFlags( Packet::FLAG_HAS_SEQUENCE_NUMBER ))
{
// If we're sending reliable traffic on a channel, use the
// channel's sequence numbers. Otherwise use the nub's.
pPacket->seq() =
(pChannel && pPacket->hasFlags( Packet::FLAG_IS_RELIABLE )) ?
pChannel->useNextSequenceID() :
seqNumAllocator.getNext();
pPacket->packFooter( pPacket->seq() );
if (pPacket == pFirstPacket_)
{
firstSeq = pPacket->seq();
lastSeq = pPacket->seq() + numPackets - 1;
}
}
// 省略很多代码
}
return pFirstOverflowPacket;
}
这个preparePackets内部用循环去处理当前Bundle里的每一个Packet。如果当前Bundle是一个需要执行可靠收发的Bundle,则内部的所有Packet都会有Packet::FLAG_IS_RELIABLE这个可靠性的flag,此时会通过reserveFooter在当前Packet的末尾预留四个字节大小的长度来等待后续的序列号填充,并顺带的会开启Packet::FLAG_HAS_SEQUENCE_NUMBER这个需要携带序列号的标记位。这里的reserveFooter并不会分配额外的动态内存,而是累计到当前footerSize_上,作为当前packet的footer部分的总大小:
void reserveFooter( int nBytes ) { footerSize_ += nBytes; }
int footerSize() const { return footerSize_; }
当所有的footer都被统计了之后,会使用Packet::grow来在原有的数据后面进行内存边界扩张:
void grow( int nBytes ) { msgEndOffset_ += nBytes; }
这里grow的时候不需要担心msgEndOffset_这个偏移量会超过内部data_数组的最大容量,因为Packet会预先留一个最大可能的RESERVED_FOOTER_SIZE字节不被填参的时候占用,统计当前Packet可用内存的时候会预先对这块区域做扣除:
/// The amount of space that is reserved for fixed-length footers on a
/// packet. This is done so that the bundle logic can always assume that
/// these footers will fit and not have to worry about pre-allocating them.
/// This is currently 27 bytes, roughly 1.5% of the capacity of a packet, so
/// there's not too much wastage.
static const int RESERVED_FOOTER_SIZE =
sizeof( Offset ) + // FLAG_HAS_REQUESTS
sizeof( AckCount ) + // FLAG_HAS_ACKS
sizeof( SeqNum ) + // FLAG_HAS_SEQUENCE_NUMBER
sizeof( SeqNum ) * 2 + // FLAG_IS_FRAGMENT
sizeof( ChannelID ) + sizeof( ChannelVersion ) + // FLAG_INDEXED_CHANNEL
sizeof( Checksum ); // FLAG_HAS_CHECKSUM
int freeSpace() const
{
return MAX_SIZE -
RESERVED_FOOTER_SIZE -
msgEndOffset_ -
footerSize_ -
extraFilterSize_;
}
然后使用的时候,使用专门的接口packFooter,这个接口会将传入数据的字节优先从data_的末尾进行填充,所以当所有的footer都填充完成之后,msgEndOffset_指向的就是真正数据的最大偏移,在这个msgEndOffset_之后的footerSize_字节对应的数据全都是footer对应的数据:
/**
* This method writes a footer to the back of this packet. It should only
* be called from NetworkInterface::send() and assumes that size_ has been
* artificially increased so that it points to the end of the footers, the
* idea being that we work back towards the real body end.
*/
template <class TYPE>
void packFooter( TYPE value )
{
msgEndOffset_ -= sizeof( TYPE );
switch( sizeof( TYPE ) )
{
case sizeof( uint8 ):
*(TYPE*)this->back() = value; break;
case sizeof( uint16 ):
*(TYPE*)this->back() = BW_HTONS( value ); break;
case sizeof( uint32 ):
*(TYPE*)this->back() = BW_HTONL( value ); break;
default:
CRITICAL_MSG( "Footers of size %" PRIzu " aren't supported",
sizeof( TYPE ) );
}
}
preparePackets后面的逻辑里发现当前packet拥有Packet::FLAG_HAS_SEQUENCE_NUMBER这个标记位的时候,就会生成一个递增序列号并使用packFooter将这个序列号填充到之前预留的位置上。这里生成递增序列号有两个途径,正常情况下我们的RPC对应的packet都是Packet::FLAG_IS_RELIABLE 的,因此会走UDPChannel::useNextSequenceID:
/**
* This method returns the next sequence ID, and then increments it.
*
* @return The next sequence ID.
*/
SeqNum UDPChannel::useNextSequenceID()
{
SeqNum retSeq = largeOutSeqAt_.getNext();
if (this->isInternal())
{
int usage = this->sendWindowUsage();
int & threshold = this->sendWindowWarnThreshold();
if (usage > threshold)
{
WARNING_MSG( "UDPChannel::useNextSequenceID( %s ): "
"Send window backlog is now %d packets, "
"exceeded previous max of %d, "
"critical size is %u\n",
this->c_str(), usage, threshold, windowSize_ );
threshold = usage;
}
if (this->isIndexed() &&
(s_pSendWindowCallback_ != NULL) &&
(usage > s_sendWindowCallbackThreshold_))
{
(*s_pSendWindowCallback_)( *this );
}
}
return retSeq;
}
这里的largeOutSeqAt_是UDPChannel内的一个成员变量,作为递增序列号发生器来使用。在这里生成下一个递增序列号的时候,会顺带的检查一下现在的发送窗口里有多少个Packet还没有发送出去,如果数值太大则会有一个警告,同时会触发一个s_pSendWindowCallback_的回调。
在preparePackets填充好每个Packet的递增序列号之后,会使用addResendTimer对每个需要可靠发送的包注册为可以消息重传的Packet:
// set up the reliable machinery
if (pPacket->hasFlags( Packet::FLAG_IS_RELIABLE ))
{
if (pChannel)
{
const ReliableOrder *roBeg, *roEnd;
if (pChannel->isInternal())
{
roBeg = roEnd = NULL;
}
else
{
this->reliableOrders( pPacket, roBeg, roEnd );
}
if (!pChannel->addResendTimer( pPacket->seq(), pPacket,
roBeg, roEnd ))
{
if (pFirstOverflowPacket == NULL)
{
pFirstOverflowPacket = pPacket;
}
// return REASON_WINDOW_OVERFLOW;
}
else
{
MF_ASSERT( pFirstOverflowPacket == NULL );
}
}
}
在这个addResendTimer中,会为每个要发送的Packet构造一个UnackedPacket,同时记录一下现在的时间,并将这个UnackedPacket放到当前UDPChannel的unackedPackets_数组里去,
/**
* This method records a packet that may need to be resent later if it is not
* acknowledged. It is called when a packet is sent on our behalf.
*
* @return false if the window size was exceeded.
*/
bool UDPChannel::addResendTimer( SeqNum seq, Packet * p,
const ReliableOrder * roBeg, const ReliableOrder * roEnd )
{
MF_ASSERT( (oldestUnackedSeq_ == SEQ_NULL) ||
unackedPackets_[ oldestUnackedSeq_ ] );
MF_ASSERT( seq == p->seq() );
UnackedPacket * pUnackedPacket = new UnackedPacket( p );
// If this channel has no unacked packets, record this as the oldest.
if (oldestUnackedSeq_ == SEQ_NULL)
{
oldestUnackedSeq_ = seq;
}
// Fill it in
pUnackedPacket->lastSentAtOutSeq_ = seq;
uint64 now = timestamp();
pUnackedPacket->lastSentTime_ = now;
lastReliableSendTime_ = now;
pUnackedPacket->wasResent_ = false;
if (roBeg != roEnd)
{
pUnackedPacket->reliableOrders_.assign( roBeg, roEnd );
}
// Grow the unackedPackets_ array, if necessary.
if (seqMask( seq - oldestUnackedSeq_ + 1 ) > unackedPackets_.size())
{
unackedPackets_.doubleSize( oldestUnackedSeq_ );
if (this->networkInterface().isVerbose())
{
INFO_MSG( "UDPChannel::addResendTimer( %s ): "
"Doubled send buffer size to %u\n",
this->c_str(),
unackedPackets_.size() );
}
}
MF_ASSERT( unackedPackets_[ seq ] == NULL );
unackedPackets_[ seq ] = pUnackedPacket;
MF_ASSERT( (oldestUnackedSeq_ == SEQ_NULL) ||
unackedPackets_[ oldestUnackedSeq_ ] );
if (seqMask( largeOutSeqAt_ - oldestUnackedSeq_ ) >= windowSize_)
{
// Make sure that we at least send occasionally.
UnackedPacket * pPrevUnackedPacket =
unackedPackets_[ seqMask( smallOutSeqAt_ - 1 ) ];
if ((pPrevUnackedPacket == NULL) ||
(now - pPrevUnackedPacket->lastSentTime_ >
minInactivityResendDelay_))
{
this->sendUnacked( *unackedPackets_[ smallOutSeqAt_ ] );
smallOutSeqAt_ = seqMask( smallOutSeqAt_ + 1 );
}
this->checkOverflowErrors();
//We shouldn't send now
return false;
}
else
{
//We should send now
smallOutSeqAt_ = largeOutSeqAt_;
return true;
}
}
这个函数的后半部分差不多实现了一个类似于TCP里的滑动窗口的概念,oldestUnackedSeq_是当前最小的没有收到对端ACK的Packet的序号,而largeOutSeqAt_代表的是当前已发送出去的Packet的最大序号, smallOutSeqAt_代表的是等待发送的最小Packet序号:
uint32 windowSize_;
/// Generally, the sequence number of the next packet to be sent.
SeqNum smallOutSeqAt_; // This does not include packets in
// overflowPackets_
SeqNumAllocator largeOutSeqAt_; // This does include packets in
// overflowPackets_
/// The sequence number of the oldest unacked packet on this channel.
SeqNum oldestUnackedSeq_;
/// The last time a reliable packet was sent (for the first time) on this
/// channel, as a timestamp.
uint64 lastReliableSendTime_;
如果largeOutSeqAt_与oldestUnackedSeq_两者的差值大于等于指定的windowSize_,则代表目前已发送但是没有确认接受的包已经超过了阈值。此时会认为当前包要暂存下来,不能直接发出,等待窗口有余量的时候再发,避免信道拥堵时的进一步加重,所以这里会返回false。在返回之前会判断一下近期一段时间minInactivityResendDelay_内是否有新的包发从出去,如果没有的话则强制发出下一个等待处理的包,并更新smallOutSeqAt_++。这样做的目的是强制推进一下滑动窗口,通知对端当前已经收到的ACK的最大值,类似于一种心跳机制。
如果当前已发送但没有ack的包的数量不是很多,则认为当前Packet可以发出,函数返回true。返回之前会更新smallOutSeqAt_为当前已发送出去的Packet的最大序号largeOutSeqAt_。网络状况好的话当前没有未ACK的Packet,此时largeOutSeqAt_就是当前新Packet的序列号。
发送端除了处理滑动窗口之外,还需要处理已发送包的ACK, 这部分逻辑是通过函数handleAck来处理的:
/**
* This method removes a packet from the collection of packets that have been
* sent but not acknowledged. It is called when an acknowledgement to a packet
* on this channel is received.
*
* Returns false on error, true otherwise.
*/
bool UDPChannel::handleAck( SeqNum seq )
{
MF_ASSERT( (oldestUnackedSeq_ == SEQ_NULL) ||
unackedPackets_[ oldestUnackedSeq_ ] );
// Make sure the sequence number is valid
// 忽略一些容错处理
// now make sure there's actually a packet there
UnackedPacket * pUnackedPacket = unackedPackets_[ seq ];
if (pUnackedPacket == NULL)
{
return true;
}
// Update the average RTT for this channel, if this packet hadn't already
// been resent.
// 忽略一些计算RTT的代码
// 忽略一些无关代码
// If we released the oldest unacked packet, figure out the new one
if (seq == oldestUnackedSeq_)
{
oldestUnackedSeq_ = SEQ_NULL;
for (uint i = seqMask( seq+1 );
i != largeOutSeqAt_;
i = seqMask( i+1 ))
{
if (unackedPackets_[ i ])
{
oldestUnackedSeq_ = i;
break;
}
}
}
// If the incoming seq is after the last ack, then it is the new last ack
if (seqLessThan( highestAck_, seq ))
{
highestAck_ = seq;
}
// Now we can release the unacked packet
bw_safe_delete( pUnackedPacket );
unackedPackets_[ seq ] = NULL;
MF_ASSERT( oldestUnackedSeq_ == SEQ_NULL ||
unackedPackets_[ oldestUnackedSeq_ ] );
while (seqMask(smallOutSeqAt_ - oldestUnackedSeq_) < windowSize_ &&
unackedPackets_[ smallOutSeqAt_ ])
{
this->sendUnacked( *unackedPackets_[ smallOutSeqAt_ ] );
smallOutSeqAt_ = seqMask( smallOutSeqAt_ + 1 );
}
return true;
}
这个函数的逻辑比较简单, 可以分为这四个部分:
- 如果当前被
ACK的包就是之前没有被ACK的最小序号oldestUnackedSeq_,则通过遍历后续所有已发送包里获取其中第一个未ACK的序号执行oldestUnackedSeq_的更新。这里不直接进行++操作是因为handleAck可能会被乱序执行,即可能先收到5的ACK,再收到4的ACK。 - 更新
highestAck_为当前已接受到的最大ACK,注意这里并不代表highestAck_之前的所有包都已经被ACK了 - 通过
bw_safe_delete来释放之前存储的Packet,因为已经被ACK的包不可能再被重传,所以没必要再保留备份 - 如果当前的滑动窗口没有被塞满,则可以将下一个等待发送的包
smallOutSeqAt_发送出去,同时更新smallOutSeqAt_++
如果对端通过某种机制通知到当前的UDPChannel所有指定序列号endSeq之前的包都已经被顺序接收了,则UDPChannel::handleCumulativeAck会被调用。在这个函数里会获取最小的没有被ack的包序号oldestUnackedSeq_,然后遍历[oldestUnackedSeq_, endSeq)区间内的所有包来调用handleAck:
/**
* This method handles a cumulative ACK. This indicates that all packets
* BEFORE a sequence number have been received by the remote end.
*
* @return False on error, true otherwise.
*/
bool UDPChannel::handleCumulativeAck( SeqNum endSeq )
{
// Make sure the sequence number is valid
// 忽略一些容错代码
if (!this->hasUnackedPackets())
{
return true;
}
// Check that the ACK is not in the future.
// Note: endSeq is first seqNum after what's been received.
// 忽略一些容错代码
SeqNum seq = oldestUnackedSeq_;
// Note: Up to but not including endSeq
while (seqLessThan( seq, endSeq ))
{
this->handleAck( seq );
seq = seqMask( seq + 1 );
}
return true;
}
如果一些包长时间没有被ACK,则需要主动的对这些包进行重传,这部分的逻辑在UDPChannel::checkResendTimers函数里,每次发送一个包之前都会被调用:
/**
* This method sends the given bundle on this channel. If no bundle is
* supplied, the channel's own bundle will be sent.
*
* @param pBundle The bundle to send, or NULL to send the channel's own
* bundle.
*/
void Channel::send( Bundle * pBundle /* = NULL */ )
{
// 省略很多代码
this->doPreFinaliseBundle( *pBundle );
// 省略很多代码
}
/*
* Override from Channel.
*/
void UDPChannel::doPreFinaliseBundle( Bundle & bundle )
{
// Tack on piggybacks.
UDPBundle & udpBundle = static_cast< UDPBundle & >( bundle );
this->checkResendTimers( udpBundle );
}
/**
* This method resends any unacked packets as appropriate. This can be because
* of time since last sent, receiving later acks before earlier ones.
*/
void UDPChannel::checkResendTimers( UDPBundle & bundle )
{
// There are no un-acked packets
if (oldestUnackedSeq_ == SEQ_NULL)
{
return;
}
// Don't do anything if the remote process has failed
// 忽略对端下线的情况
// If we have unacked packets that are getting a bit old, then resend the
// ones that are older than we'd like. Anything that has taken more than
// twice the RTT on the channel to come back is considered to be too old.
uint64 now = timestamp();
uint64 resendPeriod =
std::max( roundTripTime_*2, minInactivityResendDelay_ );
uint64 lastReliableSendTime = this->lastReliableSendOrResendTime();
const bool isIrregular = !this->isRemoteRegular();
const SeqNum endSeq = isIrregular ? smallOutSeqAt_ : highestAck_;
const bool isDebugVerbose = this->networkInterface().isDebugVerbose();
int numResends = 0;
// TODO: 8 is a magic number and would be nice to be more scientific.
// The idea is to throttle the resends a little in extreme situations. We
// want to send enough so that no (or not too many) packets are lost but
// still be able to send more when the RTT is large.
const int MAX_RESENDS = windowSize_/8;
for (SeqNum seq = oldestUnackedSeq_;
seqLessThan( seq, endSeq ) && numResends < MAX_RESENDS;
seq = seqMask( seq + 1 ))
{
UnackedPacket * pUnacked = unackedPackets_[ seq ];
// Send if the packet is old, or we have a later ack
if (pUnacked != NULL)
{
const bool hasNewerAck =
seqLessThan( pUnacked->lastSentAtOutSeq_, highestAck_);
const bool shouldResend = hasNewerAck ||
(isIrregular && (now - pUnacked->lastSentTime_ > resendPeriod));
const SeqNum prevLastSentAtOutSeq = pUnacked->lastSentAtOutSeq_;
const uint64 prevLastSentTime = pUnacked->lastSentTime_;
if (shouldResend)
{
bool piggybacked = this->resend( seq, bundle );
++numResends;
// 忽略一些警告代码
}
}
}
}
在这个函数里会首先计算出当前能够容忍的最长未确认时间resendPeriod,然后遍历当前所有的已发出但未确认的UnackedPacket,如果这些UnackedPacket的发送后时长大于最长未确认时间resendPeriod,则会调用resend将这个Packet重新发送:
/**
* Resends an un-acked packet by the most sensible method available.
*
* @return Returns true if the packet is no longer un-acked.
*/
bool UDPChannel::resend( SeqNum seq, UDPBundle & bundle )
{
++numPacketsResent_;
UnackedPacket & unacked = *unackedPackets_[ seq ];
// If possible, piggypack this packet onto the next outgoing bundle
if (this->isExternal() &&
!unacked.pPacket_->hasFlags( Packet::FLAG_IS_FRAGMENT ) &&
(unackedPackets_[ smallOutSeqAt_ ] == NULL)) // Not going to overflow
{
if (bundle.piggyback(
seq, unacked.reliableOrders_, unacked.pPacket_.get() ))
{
unacked.wasResent_ = true; // Don't count this for RTT calculations
this->handleAck( seq );
return true;
}
}
// If there are any acks on this packet, then they will be resent too, but
// it does no harm.
this->sendUnacked( unacked );
return false;
}
这个resend的最简单的处理逻辑就是执行sendUnacked来重发,记录一下这个包的重发时间戳和重发记录:
/**
* Resends an un-acked packet by the most sensible method available.
*/
void UDPChannel::sendUnacked( UnackedPacket & unacked )
{
unacked.pPacket_->updateChannelVersion( version_, id_ );
pNetworkInterface_->sendPacket( addr_, unacked.pPacket_.get(), this,
/* isResend: */ true );
unacked.lastSentAtOutSeq_ = smallOutSeqAt_;
unacked.wasResent_ = true;
uint64 now = timestamp();
unacked.lastSentTime_ = now;
lastReliableResendTime_ = now;
}
如果当前是面向客户端的下行通道,且当前要重发的包包含了完整的一个bundle,同时滑动窗口里允许下一个包进行发送,那么这里会通过piggyback函数将当前Packet连接到当前要发送的bundle的Packet链表后面,并执行这个Packet的手动ACK。这样的快速路径相当于将这个要重传的Packet与当前要发送的bundle进行合并了。
至此UDPChannel的发送端实现了序号分配、滑动窗口、接收确认、超时重传等流控和可靠机制,基本等价实现了一个TCP的发送端。接下来我们再来看看UDPChannel的接收端是如何收取Packet并返回ACK的,相关代码在之前我们已经介绍过的PacketReceiver::processFilteredPacket函数里,不过之前只介绍了接收端如何从Packet里解包Bundle的代码,并没有关注ACK的部分。现在我们再来看看这个函数是怎么发出ACK的:
/**
* This function has to be very robust, if we intend to use this transport over
* the big bad internet. We basically have to assume it'll be complete garbage.
*/
Reason PacketReceiver::processFilteredPacket( const Address & addr,
Packet * p, ProcessSocketStatsHelper * pStatsHelper )
{
// 省略很多代码
// now do something if it's reliable
if (p->hasFlags( Packet::FLAG_IS_RELIABLE ))
{
// first make sure it has a sequence number, so we can address it
// 省略一些容错代码
// should we be looking in a channel
if (pChannel)
{
UDPChannel::AddToReceiveWindowResult result =
pChannel->addToReceiveWindow( p, addr, stats_ );
if (!pChannel->isLocalRegular())
{
shouldSendChannel = true;
}
if (result != UDPChannel::PACKET_IS_NEXT_IN_WINDOW)
{
// The packet is not corrupted, and has either already been
// received, or is too early and has been buffered. In either
// case, we send the ACK immediately, as long as the channel is
// established and is irregular.
if (result != UDPChannel::PACKET_IS_CORRUPT)
{
if (pChannel->isEstablished() && shouldSendChannel)
{
UDPBundle emptyBundle;
pChannel->send( &emptyBundle );
}
return REASON_SUCCESS;
}
// The packet has an invalid sequence number.
else
{
RETURN_FOR_CORRUPTED_PACKET();
}
}
}
}
}
这里会根据addToReceiveWindow的返回之来做处理,如果返回值是PACKET_IS_NEXT_IN_WINDOW,则代表这个Packet刚好就是下一个被期望的序号,否则代表这个包是乱序接收的,此时会构建一个空UDPBundle来发送。这个空UDPBundle的作用就是立即ACK当前包,但是这里面的调用链其实比较晦涩,我们先跳过之前分析过的UDPChannel::send的调用链的具体内容,直接跳转到调用到的UDPBundle::preparePackets:
/**
* This method prepares packets this bundle for sending.
*
* @param pChannel The channel, or NULL for off-channel sending.
* @param seqNumAllocator The network interface's sequence number allocator,
* used for off-channel sending.
* @param sendingStats The sending stats to update.
*/
Packet * UDPBundle::preparePackets( UDPChannel * pChannel,
SeqNumAllocator & seqNumAllocator,
SendingStats & sendingStats,
bool shouldUseChecksums )
{
// fill in all the footers that are left to us
Packet * pFirstOverflowPacket = NULL;
int numPackets = this->numDataUnits();
SeqNum firstSeq = 0;
SeqNum lastSeq = 0;
// Write footers for each packet.
for (Packet * pPacket = this->pFirstPacket();
pPacket;
pPacket = pPacket->next())
{
MF_ASSERT( pPacket->msgEndOffset() >= Packet::HEADER_SIZE );
if (shouldUseChecksums)
{
// Reserve space for the checksum footer
MF_ASSERT( !pPacket->hasFlags( Packet::FLAG_HAS_CHECKSUM ) );
pPacket->reserveFooter( sizeof( Packet::Checksum ) );
pPacket->enableFlags( Packet::FLAG_HAS_CHECKSUM );
}
this->writeFlags( pPacket );
if (pChannel)
{
pChannel->writeFlags( pPacket );
}
// 省略很多代码
}
// 省略很多代码
}
这里的pChannel->writeFlags( pPacket )会在当前Packet还有足够剩余空间的时候将ACK信息携带进去:
/**
* This method will write the flags on a packet fitting for one that will ride
* on this channel. It will also reserve enough space for the footer.
*/
void UDPChannel::writeFlags( Packet * p )
{
p->enableFlags( Packet::FLAG_ON_CHANNEL );
// 忽略无关代码
// Add a cumulative ACK. This indicates that all packets BEFORE a given seq
// have been received.
if (p->freeSpace() >= int( sizeof( SeqNum )))
{
p->enableFlags( Packet::FLAG_HAS_CUMULATIVE_ACK );
p->reserveFooter( sizeof( SeqNum ) );
Acks::iterator iter = acksToSend_.begin();
while (iter != acksToSend_.end())
{
// Need to go through all due to wrap-around case.
if (seqLessThan( *iter, inSeqAt_ ) )
{
acksToSend_.erase( iter++ );
}
else
{
++iter;
}
}
}
// 省略很多代码
}
如果当前Packet里剩余空间大于四个字节的话(对于空包来说这个显然成立),这里会优先加入批量ACK的消息,在Packet的flags里加上Packet::FLAG_HAS_CUMULATIVE_ACK,然后在Footer里塞入下一个希望收到的连续包的编号inSeqAt_。这个inSeqAt_的更新是在UDPChannel::addToReceiveWindow函数里做的,如果新包的序号等于这个inSeqAt_,则对这个inSeqAt_不断递增,直到已经接受的乱序Packet里没有这个编号:
UDPChannel::AddToReceiveWindowResult UDPChannel::addToReceiveWindow(
Packet * p, const Address & srcAddr, PacketReceiverStats & stats )
{
const SeqNum seq = p->seq();
const bool isDebugVerbose = this->networkInterface().isDebugVerbose();
// 省略很多的代码
// check the good case first
if (seq == inSeqAt_)
{
inSeqAt_ = seqMask( inSeqAt_ + 1 );
Packet * pPrev = p;
Packet * pBufferedPacket = bufferedReceives_[ inSeqAt_ ].get();
// Attach as many buffered packets as possible to this one.
while (pBufferedPacket != NULL)
{
// Link it to the prev packet then remove it from the buffer.
pPrev->chain( pBufferedPacket );
bufferedReceives_[ inSeqAt_ ] = NULL;
--numBufferedReceives_;
// Advance to the next buffered packet.
pPrev = pBufferedPacket;
inSeqAt_ = seqMask( inSeqAt_ + 1 );
pBufferedPacket = bufferedReceives_[ inSeqAt_ ].get();
}
return PACKET_IS_NEXT_IN_WINDOW;
}
// 省略很多代码
}
当确定了要发送连续ACK之后,会从已接收但未ACK的Packet集合里删除所有序号比inSeqAt_小的,因为已经不再需要保留了。
除了这个连续ACK的加入之外,还需要在剩下的空余空间里塞入一些离散的ACK信息,这部分代码就在UDPChannel::writeFlags处理连续ACK代码的后面:
// Put on as many acks as we can.
if (!acksToSend_.empty() &&
p->freeSpace() >= int(sizeof( Packet::AckCount ) + sizeof( SeqNum )))
{
//Required to make GCC link this, something to do with templates
const size_t MAX_ACKS = Packet::MAX_ACKS;
p->enableFlags( Packet::FLAG_HAS_ACKS );
p->reserveFooter( sizeof( Packet::AckCount ) );
const size_t minSpace = p->freeSpace() / sizeof( SeqNum );
const size_t minSpaceSize = std::min( minSpace, acksToSend_.size() );
const size_t nAcks = std::min( minSpaceSize, MAX_ACKS );
p->nAcks() = static_cast<Packet::AckCount>(nAcks);
p->reserveFooter( sizeof( SeqNum ) * p->nAcks() );
}
这里会根据剩余空间的大小来算出当前Packet最多还能携带几个离散的ACK,存储在p->nAcks()里。等到真正执行写入的时候,会从acksToSend_里选择p->nAcks()数量的ACK序号塞入到footer里,同时从acksToSend_删除这些元素:
/**
* This method will write the appropriate flags on a packet to indicate that
* it is on this channel. It must be called after writeFlags.
*/
void UDPChannel::writeFooter( Packet * p )
{
if (p->hasFlags( Packet::FLAG_INDEXED_CHANNEL ))
{
p->packFooter( p->channelID() );
p->packFooter( p->channelVersion() );
}
if (p->hasFlags( Packet::FLAG_HAS_CUMULATIVE_ACK ))
{
p->packFooter( inSeqAt_ );
}
if (p->hasFlags( Packet::FLAG_HAS_ACKS ))
{
// Note: Technically we should start at inSeqAt_ since sequence numbers
// wrap around but this is rare enough not to worry about (since it
// still works but is less efficient).
p->packFooter( (Packet::AckCount)p->nAcks() );
uint acksAdded = 0;
while (!acksToSend_.empty() && acksAdded < p->nAcks())
{
p->packFooter( *acksToSend_.begin() );
acksToSend_.erase( acksToSend_.begin() );
++acksAdded;
}
}
}
因为我们的ACK信息是附带在Packet里的,所以这个Packet丢失之后仍然会通过超时重传发送到对端。这里有一点递归的意思,可靠传输依赖于ACK,而ACK的发送又依赖于可靠传输。
当UDPChannel接收到一个携带了Packet::FLAG_HAS_CUMULATIVE_ACK或者Packet::FLAG_HAS_ACKS的Packet时,就会从footer里解析出这些ACK的包序号,执行之前我们介绍过的handleCumulativeAck和handleAck:
Reason PacketReceiver::processFilteredPacket( const Address & addr,
Packet * p, ProcessSocketStatsHelper * pStatsHelper )
{
// 省略很多代码
f (p->hasFlags( Packet::FLAG_HAS_CUMULATIVE_ACK ))
{
if (!pChannel)
{
// 省略错误日志
}
SeqNum endSeq;
if (!p->stripFooter( endSeq ))
{
// 省略错误日志
}
if (!pChannel->handleCumulativeAck( endSeq ))
{
RETURN_FOR_CORRUPTED_PACKET();
}
}
// Strip and handle ACKs
if (p->hasFlags( Packet::FLAG_HAS_ACKS ))
{
if (!p->stripFooter( p->nAcks() ))
{
// 省略错误日志
}
if (p->nAcks() == 0)
{
// 省略错误日志
}
// The total size of all the ACKs on this packet
int ackSize = p->nAcks() * sizeof( SeqNum );
// check that we have enough footers to account for all of the
// acks the packet claims to have (thanks go to netease)
if (p->bodySize() < ackSize)
{
// 省略错误日志
}
// For each ACK that we receive, we no longer need to store the
// corresponding packet.
if (pChannel)
{
for (uint i=0; i < p->nAcks(); i++)
{
SeqNum seq;
if (!p->stripFooter( seq ))
{
// 省略错误日志
}
if (!pChannel->handleAck( seq ))
{
// 省略错误日志
}
}
}
// 省略很多代码
}
// 省略很多代码
}
到这里,整个ACK的发送和接收机制已经展示完毕,配合超时重传和滑动窗口,当前的UDPChannel基本模拟了可靠TCP。