BigWorld 的网络框架

通信地址 Mailbox

在常规的网络通信中,通信地址一般都是以(ip,port)二元组的形式来构成的。如果对于指定的(ip,port)上有多个可以通信的entity/service实例的话,这个二元组就可以扩充为三元组(ip,port,entity_id/service_id)。在mosaic_game中的anchor就是这样的三元组的实现,不过anchor里用server_name来替代(ip,port)二元组,所以是由(server_name, entity_id/service_id)这样的二元组组成的。在Bigworld里也有一个专门的通信地址类型Address, 里面封装了(ip,port)二元组,然后还加上一个salt,有时候这个salt区分不同时间启动的占用同一个(ip,port)的通信地址,有时候这个字段又可以用来区分不同的Entity类型:

class Address
{
public:
	/// @name Construction/Destruction
	// @{
	Address();
	Address( uint32 ipArg, uint16 portArg );
	// @}

	uint32	ip;		///< IP address.
	uint16	port;	///< The port.
	uint16	salt;	///< Different each time.

	int writeToString( char * str, int length ) const;

	// TODO: Remove this operator
	operator char*() const	{ return this->c_str(); }
	char * c_str() const;
	const char * ipAsString() const;

	bool isNone() const			{ return this->ip == 0; }

	static Watcher & watcher();

	static const Address NONE;

private:
	/// Temporary storage used for converting the address to a string.  At
	/// present we support having two string representations at once.
	static const int MAX_STRLEN = 32;
	static char s_stringBuf[ 2 ][ MAX_STRLEN ];
	static int s_currStringBuf;
	static char * nextStringBuf();
};

然后对于指定进程上的Entity的通信地址,使用专门的EntityMailBoxRef来代表,其实就是在Address的基础上加上了EntityId:

typedef int32 EntityID;
/**
 *	This structure is a packed version of a mailbox for an entity
 */
class EntityMailBoxRef
{
public:
	EntityID			id;
	Mercury::Address	addr;

	enum Component
	{
		CELL = 0,
		BASE = 1,
		CLIENT = 2,
		BASE_VIA_CELL = 3,
		CLIENT_VIA_CELL = 4,
		CELL_VIA_BASE = 5,
		CLIENT_VIA_BASE = 6,
		SERVICE = 7
	};

	EntityMailBoxRef():
		id( 0 ),
		addr( Mercury::Address::NONE )
	{}
	
	bool hasAddress() const 		{ return addr != Mercury::Address::NONE; }

	Component component() const		{ return (Component)(addr.salt >> 13); }
	void component( Component c )	{ addr.salt = type() | (uint16(c) << 13); }

	EntityTypeID type() const		{ return addr.salt & 0x1FFF; }
	void type( EntityTypeID t )		{ addr.salt = (addr.salt & 0xE000) | t; }

	void init() { id = 0; addr.ip = 0; addr.port = 0; addr.salt = 0; }
	void init( EntityID i, const Mercury::Address & a,
		Component c, EntityTypeID t )
	{ id = i; addr = a; addr.salt = (uint16(c) << 13) | t; }

	static const char * componentAsStr( Component component );

	const char * componentName() const
	{
		return componentAsStr( this->component() );
	}
};

这里的枚举类型Component定义了一个EntityMailBoxRef地址的具体类型,目前有8种,复用addr.salt的高3位来表示:

  1. CELL,一个在CellApp上的Entity地址,也是最直接的Entity地址
  2. BASE,一个在BaseApp上的Base对象的地址,这个Base对象负责控制对应的RealEntity
  3. CLIENT,一个客户端地址
  4. BASE_VIA_CELL,一个中转用的CELL地址,向这个CELL地址投递的消息在被Entity接受之后,会自动的转发到对应的Base对象上
  5. CLIENT_VIA_CELL,一个中转用的CELL地址,向这个CELL地址投递的消息在被Entity接受之后,会自动的转发到对应的客户端对象上
  6. CELL_VIA_BASE, 一个中转用的Base地址,向这个BASE地址投递的消息在被Base接收之后,会自动的转发到对应的RealEntity
  7. CLIENT_VIA_BASE,一个中转用的Base地址,向这个BASE地址投递的消息在被Base接收之后,会自动的转发到对应的CLIENT
  8. SERVICE,一个服务地址

EntityMailBoxRef这个类型仅仅是作为通信地址使用的,过于底层。业务层里用来发送消息的基本都是PyEntityMailBox这个类型,因为这个类型除了存储了具体的通信地址之外,还负责对所有发送到这个地址的数据做一些封装转换的工作:

/**
 *	This class is used to represent a destination of an entity that messages
 *	can be sent to.
 *
 *	Its virtual methods are implemented differently on each component.
 */
class PyEntityMailBox: public PyObjectPlus
{
	Py_Header( PyEntityMailBox, PyObjectPlus )

public:
	PyEntityMailBox( PyTypeObject * pType = &PyEntityMailBox::s_type_ );
	virtual ~PyEntityMailBox();
		/**
	 *	Get a stream for the remote method to add arguments to. 
	 *
	 *	@param methodDesc	The method description.
	 *	@param pHandler		If the method requires a request, this is the
	 *						reply handler to use.
	 */
	virtual BinaryOStream * getStream( const MethodDescription & methodDesc, 
			std::auto_ptr< Mercury::ReplyMessageHandler > pHandler =
				std::auto_ptr< Mercury::ReplyMessageHandler >() ) = 0;
	static PyObject * constructFromRef( const EntityMailBoxRef & ref );
	static bool reduceToRef( PyObject * pObject, EntityMailBoxRef * pRefOutput );

	virtual EntityID id() const = 0;
	virtual void address( const Mercury::Address & addr ) = 0;
	virtual const Mercury::Address address() const = 0;

	virtual void migrate() {}

	typedef PyObject * (*FactoryFn)( const EntityMailBoxRef & ref );
	static void registerMailBoxComponentFactory(
		EntityMailBoxRef::Component c, FactoryFn fn,
		PyTypeObject * pType );

	typedef bool (*CheckFn)( PyObject * pObject );
	typedef EntityMailBoxRef (*ExtractFn)( PyObject * pObject );
	static void registerMailBoxRefEquivalent( CheckFn cf, ExtractFn ef );

	PY_RO_ATTRIBUTE_DECLARE( this->id(), id );
	PyObject * pyGet_address();
	PY_RO_ATTRIBUTE_SET( address );
	
	PY_AUTO_METHOD_DECLARE( RETOWN, callMethod, 
		ARG( ScriptString, ARG( ScriptTuple, END ) ) );
	PyObject * callMethod(
		const ScriptString & methodName, const ScriptTuple & arguments  );

	PyObject * callMethod( 
		const MethodDescription * methodDescription,
		const ScriptTuple & args );
	// 省略很多函数声明
};

这里还提供了一个静态函数来通过EntityMailBoxRef创建具体的PyEntityMailBox子类型对象:

/**
 *	Construct a PyEntityMailBox or equivalent from an EntityMailBoxRef.
 *	Returns Py_None on failure.
 */
PyObject * PyEntityMailBox::constructFromRef(
	const EntityMailBoxRef & ref )
{
	if (ref.id == 0) Py_RETURN_NONE;

	if (s_pRefReg == NULL) Py_RETURN_NONE;

	Fabricators::iterator found = s_pRefReg->fabs_.find( ref.component() );
	if (found == s_pRefReg->fabs_.end()) Py_RETURN_NONE;

	PyObject * pResult = (*found->second)( ref );

	if (pResult)
	{
		return pResult;
	}
	else
	{
		WARNING_MSG( "PyEntityMailBox::constructFromRef: "
				"Could not create mailbox from id %d. addr %s. component %d\n",
				ref.id, ref.addr.c_str(), ref.component() );
		Py_RETURN_NONE;
	}
}

/**
 *	Register a PyEntityMailBox factory
 */
void PyEntityMailBox::registerMailBoxComponentFactory(
	EntityMailBoxRef::Component c, FactoryFn fn, PyTypeObject * pType )
{
	if (s_pRefReg == NULL) s_pRefReg = new MailBoxRefRegistry();
	s_pRefReg->fabs_.insert( std::make_pair( c, fn ) );
	s_pRefReg->mailBoxTypes_.push_back( pType );
}

创建子类型实例的时候使用了一个map来接收所有注册过来的子类型,目前的代码里使用的是手动注册的方式:

/**
 *	This class registers our classes into the PyEntityMailBox system,
 *	and provides some glue/helper functions for it.
 */
static class CellAppPostOfficeAttendant
{
public:
	CellAppPostOfficeAttendant()
	{
		PyEntityMailBox::registerMailBoxComponentFactory(
			EntityMailBoxRef::CELL, newCellMB, &CellEntityMailBox::s_type_ );
		PyEntityMailBox::registerMailBoxComponentFactory(
			EntityMailBoxRef::SERVICE, newBaseMB, &BaseEntityMailBox::s_type_ );
		PyEntityMailBox::registerMailBoxComponentFactory(
			EntityMailBoxRef::BASE, newBaseMB, &BaseEntityMailBox::s_type_ );
		PyEntityMailBox::registerMailBoxComponentFactory(
			EntityMailBoxRef::BASE_VIA_CELL, newBaseViaCellMB, &BaseViaCellMailBox::s_type_ );
		PyEntityMailBox::registerMailBoxComponentFactory(
			EntityMailBoxRef::CELL_VIA_BASE, newCellViaBaseMB, &CellViaBaseMailBox::s_type_ );
		PyEntityMailBox::registerMailBoxComponentFactory(
			EntityMailBoxRef::CLIENT_VIA_CELL, newClientViaCellMB, &ClientViaCellMailBox::s_type_ );
		PyEntityMailBox::registerMailBoxComponentFactory(
			EntityMailBoxRef::CLIENT_VIA_BASE, newClientViaBaseMB, &ClientViaBaseMailBox::s_type_ );
		PyEntityMailBox::registerMailBoxRefEquivalent(
			ServerEntityMailBox::Check, ServerEntityMailBox::static_ref );
		PyEntityMailBox::registerMailBoxRefEquivalent(
			Entity::Check, cellReduce );
	}
	// 省略很多代码
};

可以看出对于之前定义的枚举类EntityMailBoxRef::Component的每个值,都会有一个具体的PyEntityMailBox来承接功能。这里我们简单的来看看使用最多的CellViaBaseMailBox类型:

/**
 *	This class implements a mailbox that can send to a server object. This
 *	object may be on a cell or may be a base.
 *
 *	@see CellEntityMailBox
 *	@see BaseEntityMailBox
 */
class ServerEntityMailBox: public PyEntityMailBox
{
	Py_Header( ServerEntityMailBox, PyEntityMailBox )

public:
	ServerEntityMailBox( EntityTypePtr pBaseType,
			const Mercury::Address & addr, EntityID id,
			PyTypeObject * pType = &s_type_ );
	virtual ~ServerEntityMailBox();

	virtual const Mercury::Address		address() const		{ return addr_; }
	virtual void address( const Mercury::Address & addr )	{ addr_ = addr; }
	virtual void migrate();

	virtual EntityID			id() const			{ return id_; }

	// 省略一些python交互代码
	EntityMailBoxRef ref() const;
	virtual EntityMailBoxRef::Component component() const = 0;
	const char * componentName() const;

	static EntityMailBoxRef static_ref( PyObject * pThis )
		{ return ((const ServerEntityMailBox*)pThis)->ref(); }

	static void migrateMailBoxes();
	static void adjustForDeadBaseApp( const Mercury::Address & deadAddr,
			const BackupHash & backupHash );

protected:

	Mercury::Address			addr_;
	EntityID					id_;

	EntityTypePtr	pLocalType_;
};

/**
 *	This class is common to all mailboxes that send to the base entity or via
 *	the base entity.
 */
class CommonBaseEntityMailBox : public ServerEntityMailBox
{
	Py_Header( CommonBaseEntityMailBox, ServerEntityMailBox )

public:
	CommonBaseEntityMailBox( EntityTypePtr pBaseType,
			const Mercury::Address & addr, EntityID id,
			PyTypeObject * pType = &s_type_ ) :
		ServerEntityMailBox( pBaseType, addr, id, pType )
	{}
	void sendStream();

protected:
	Mercury::Bundle & bundle() const;

private:
	virtual Mercury::UDPChannel & channel() const;
	Mercury::UDPChannel & channel( Entity * pEntity ) const;
};

/**
 *	This class is used to create a mailbox to a cell entity. Traffic for the
 *	entity is sent via the base entity instead of directly to the cell entity.
 *	This means that these mailboxes do not have the restrictions that normal
 *	cell entity mailboxes have.
 */
class CellViaBaseMailBox : public CommonBaseEntityMailBox
{
	Py_Header( CellViaBaseMailBox, CommonBaseEntityMailBox )

	public:
		CellViaBaseMailBox( EntityTypePtr pBaseType,
					const Mercury::Address & addr, EntityID id,
					PyTypeObject * pType = &s_type_ ):
			CommonBaseEntityMailBox( pBaseType, addr, id, pType )
		{}

		~CellViaBaseMailBox() { }

		virtual ScriptObject pyGetAttribute( const ScriptString & attrObj );
		virtual BinaryOStream * getStream( const MethodDescription & methodDesc,
			std::auto_ptr< Mercury::ReplyMessageHandler > pHandler );
		virtual EntityMailBoxRef::Component component() const;
		virtual const MethodDescription * findMethod( const char * attr ) const;
};

没想到这个CellViaBaseMailBox的继承层级还比较深,其继承链为CommonBaseEntityMailBox => ServerEntityMailBox => PyEntityMailBox。在ServerEntityMailBox类型上负责提供Address,EntityId, EntityTypePtr这三个成员变量,基本对应上了EntityMailBoxRef的各种分量,所以这里提供了ref函数来构造EntityMailBoxRef

/**
 *	Get a packed ref representation of this mailbox
 */
EntityMailBoxRef ServerEntityMailBox::ref() const
{
	EntityMailBoxRef mbr; mbr.init(
		id_, addr_, this->component(), pLocalType_->description().index() );
	return mbr;
}

然后在CommonBaseEntityMailBox这个类型上开始提供bundlechannel这两个接口。这里的channel就是一个信道的概念,负责消息的可靠收发。其实这里的channel并没有绑定在当前的CommonBaseEntityMailBox,而是绑定在RealEntity上,如果当前CellApp里找不到这个RealEntity则使用裸地址来创建channel:

/**
 *  This method returns the most appropriate channel for this mailbox.  The
 *  entity is expected to have already been looked up.  It will use the entity
 *  channel if it can, otherwise it just falls through to the base
 *  implementation.
 */
Mercury::UDPChannel & CommonBaseEntityMailBox::channel( Entity * pEntity ) const
{
	return (pEntity && pEntity->isReal()) ?
		pEntity->pReal()->channel() : CellApp::getChannel( addr_ );
}

然后bundle是单个消息包的概念, 每次调用bundle的时候都会自动的从对应的channel里添加一个新的bundle,然后返回这个bundle的引用来做数据填充:

/**
 *  This method returns a bundle that will be sent to the base entity.  It
 *  overrides the base behaviour of just returning the channel's bundle by
 *  prefixing the bundle with a setClient message if we're not sending on the
 *  entity channel.
 */
Mercury::Bundle & CommonBaseEntityMailBox::bundle() const
{
	Entity * pEntity = CellApp::instance().findEntity( id_ );
	Mercury::Bundle & bundle = this->channel( pEntity ).bundle();

	if (!pEntity || !pEntity->isReal())
	{
		BaseAppIntInterface::setClientArgs::start( bundle ).id = id_;
	}

	return bundle;
}

这里的逻辑是:如果没有找到RealEntity,这会手动的将bundle里填充id字段为要通信的Base对应的EntityID。这就是一个最基本的数据流转换操作。

裸的bundle使用起来过于自由,因为什么都可以往里面填,对于业务层维护消息类型的时候不怎么友好。所以在CellViaBaseMailBox这个类型上提供了getStream接口,这个接口负责初始化一个指定RPCBundle:

BinaryOStream * CellViaBaseMailBox::getStream(
		const MethodDescription & methodDesc,
		std::auto_ptr< Mercury::ReplyMessageHandler > pHandler )
{
	Mercury::Bundle & bundle = this->bundle();

	// Not supporting return values
	if (pHandler.get())
	{
		PyErr_Format( PyExc_TypeError,
				"Cannot call two-way method '%s' from CellApp",
				methodDesc.name().c_str() );
		return NULL;
	}

	bundle.startMessage( BaseAppIntInterface::callCellMethod );
	bundle << methodDesc.internalIndex();

	return &bundle;
}

这里负责先使用startMessage来填充当前的消息上层类型为BaseAppIntInterface::callCellMethod,然后再填入methodDesc.internalIndex()代表内层包裹的消息的具体索引。后续业务层再往这个bundle填充消息的时候,完全不知道这个RPC消息其实已经被添加了一个BaseAppIntInterface::callCellMethod的头部,神不知鬼不觉的做了一个消息转换。

数据流与封包

Bigworld里,往bundle里填充数据使用的是类似于std::iostream的流式处理,利用operator<<来填入数据,然后用operator>>来解析数据。在bundle的头文件声明里,已经提供好了绝大部分的基础类型的operator<<的支持,例如bool,int,String,Vector等:

inline BinaryOStream& operator<<( BinaryOStream &out, int64 x )
{
	bw_netlonglong n;
	n.i64 = x;
	return out << n;
}

inline BinaryOStream& operator<<( BinaryOStream &out, char x )
{
	bw_netbyte n;
	n.c = x;
	return out << n;
}


/**
 *	This method provides output streaming for a string.
 *
 *	@param b	The binary stream.
 *	@param str	The string to be streamed.
 *
 *	@return A reference to the stream that was passed in.
 */
inline BinaryOStream & operator<<( BinaryOStream & b, const BW::string & str )
{
	b.appendString( str.data(), int(str.length()) );

	return b;
}

/**
 * 	This method provides output streaming for a vector.
 *
 *	@param b	The binary stream.
 *	@param data	The vector to be streamed.
 *
 *	@return A reference to the stream that was passed in.
 */
template <class T, class A>
inline BinaryOStream & operator<<( BinaryOStream & b,
		const BW::vector<T,A> & data)
{
	uint32 max = (uint32)data.size();
	b << max;
	for (uint32 i=0; i < max; i++)
		b << data[i];
	return b;
}
class Bundle : public BinaryOStream
{
	// 省略类型的具体实现
};

不过这里没有提供对Map等复杂容器类型的支持,所以上面的CellAppInterface::addCell样例代码在序列化一个Map的时候,需要手动的序列化。下面就是一个手动序列化容器的实例,先序列化当前Map的大小,然后对内部的每个Pair执行逐个基础元素的序列化:

struct DataEntry
{
	uint16 key;
	BW::string data;
};
typedef BW::map< SpaceEntryID, DataEntry > DataEntries;
DataEntries dataEntries_;

{
	// bundle.startMessage( CellAppInterface::allSpaceData );
	// bundle << id_;
	bundle << (uint32)dataEntries_.size();

	DataEntries::const_iterator iter = dataEntries_.begin();

	while (iter != dataEntries_.end())
	{
		bundle << iter->first <<
			iter->second.key << iter->second.data;
		++iter;
	}
}

当逻辑层接收到这个RPC的时候,需要按照之前执行operator<<的顺序来执行operator>>,从而达到正确的参数解析:


/**
 *	This method reads space data from the input stream.
 */
void Space::readDataFromStream( BinaryIStream & stream )
{
	int size;
	stream >> size;

	for (int i = 0; i < size; i++)
	{
		SpaceEntryID entryID;
		uint16 key;
		BW::string value;
		stream >> entryID >> key >> value;

		this->spaceDataEntry( entryID, key, value, DONT_UPDATE_CELL_APP_MGR );
	}
}

解释了数据流是如何序列化与反序列化的之后,我们再来看一下Bundle里是如何执行封包逻辑的,这里只讲解UDPBundle,因为这个子类用的最多。封包逻辑入口是startMessage,参数为一个RPC的描述元数据InterfaceElement,以及是否是可靠消息的标记位,我们目前先默认全都是可靠的:

/**
 * 	This method starts a new message on the bundle.
 *
 * 	@param ie			The type of message to start.
 * 	@param reliable		True if the message should be reliable.
 */
void UDPBundle::startMessage( const InterfaceElement & ie, 
		ReliableType reliable )
{
	// Piggybacks should only be added immediately before sending.
	MF_ASSERT( !pCurrentPacket_->hasFlags( Packet::FLAG_HAS_PIGGYBACKS ) );
	MF_ASSERT( ie.name() );

	this->endMessage();
	curIE_ = ie;
	msgIsReliable_ = reliable.isReliable();
	msgIsRequest_ = false;
	isCritical_ = (reliable == RELIABLE_CRITICAL);
	this->newMessage();

	reliableDriver_ |= reliable.isDriver();
}

这里会先使用endMessage来完成之前的逻辑包的封装工作,然后使用newMessage来开启一个新逻辑包。endMessage的具体内容我们先不看,先看一下newMessage在干什么。函数开头先对包的一些统计信息进行修改,然后使用qreserve来预留相应大小的buffer来供后续的参数来填充:

/**
 * 	This message begins a new message, with the given number of extra bytes in
 * 	the header. These extra bytes are normally used for request information.
 *
 * 	@param extra	Number of extra bytes to reserve.
 * 	@return	Pointer to the body of the message.
 */
char * UDPBundle::newMessage( int extra )
{
	// figure the length of the header
	int headerLen = curIE_.headerSize();
	if (headerLen == -1)
	{
		CRITICAL_MSG( "Mercury::UDPBundle::newMessage: "
			"tried to add a message with an unknown length format %d\n",
			(int)curIE_.lengthStyle() );
	}

	++numMessages_;

	if (msgIsReliable_)
	{
		++numReliableMessages_;
	}

	// make space for the header
	MessageID * pHeader = (MessageID *)this->qreserve( headerLen + extra );

	// set the start of this msg
	msgBeg_ = (uint8*)pHeader;
	msgChunkOffset_ = Packet::Offset( pCurrentPacket_->msgEndOffset() );

	// write in the identifier
	*(MessageID*)pHeader = curIE_.id();

	// set the length to zero
	msgLen_ = 0;
	msgExtra_ = extra;

	// and return a pointer to the extra data
	return (char *)(pHeader + headerLen);
}

这里的pHeader就是当前消息的buffer开始地址,这里先往buffer的第一个字节填充进入当前消息的类型id,然后返回Header结束后的地址,这里预留buffer的接口是qreserve。之前在基类BinaryOStream上的所有operator<<最终都会通过reserve这个虚接口来将数据写入到buffer中, 在UDPBundle里这个虚接口的实现就是qreserve::

inline BinaryOStream& operator<<( BinaryOStream &out, bw_netlong x )
{
	BW_STATIC_ASSERT( sizeof( bw_netlong ) == 4, bw_netlong_bad_size );
	*(uint32*)out.reserve( sizeof( x ) ) = BW_HTONL( x.u32 );
	return out;
}

inline BinaryOStream& operator<<( BinaryOStream &out, bw_netlonglong x )
{
	BW_STATIC_ASSERT( sizeof( bw_netlonglong ) == 8, bw_netlonglong_bad_size );
	*(uint64*)out.reserve( sizeof( x ) ) = BW_HTONLL( x.u64 );
	return out;
}
/**
 * 	This method reserves the given number of bytes in this bundle.
 */
INLINE void * UDPBundle::reserve( int nBytes )
{
	return qreserve( nBytes );
}

所以这个qreserve就是序列化中最重要的函数,负责提供足够大的buffer来填入后续数据,我们来看看这个qreserve是如何执行动态内存分配的:

/**
 * 	This method gets a pointer to this many bytes quickly
 * 	(non-virtual function)
 */
INLINE void * UDPBundle::qreserve( int nBytes )
{
	if (nBytes <= pCurrentPacket_->freeSpace())
	{
		void * writePosition = pCurrentPacket_->back();
		pCurrentPacket_->grow( nBytes );
		return writePosition;
	}
	else
	{
		return this->sreserve( nBytes );
	}
}

从这里可以看出,Bundle的更底层组成单位为Packet,是作为一段连续内存buffer而存在的,pCurrentPacket_就是当前正在被使用的buffer。如果当前pCurrentPacket_的剩余空间不满足申请的大小,则使用sreserve来结束当前Packet,并新建一个Packet来作为新的buffer:

/**
 *  This function returns a pointer to nBytes on a bundle.
 *  It assumes that the data will not fit in the current packet,
 *  so it adds a new one. This is a private function.
 *
 *  @param nBytes	Number of bytes to reserve.
 *
 *  @return	Pointer to the reserved data.
 */
void * UDPBundle::sreserve( int nBytes )
{
	this->endPacket( /* isExtending */ true );
	this->startPacket( new Packet() );

	void * writePosition = pCurrentPacket_->back();
	pCurrentPacket_->grow( nBytes );

	MF_ASSERT( pCurrentPacket_->freeSpace() >= 0 );
	return writePosition;
}

Packet内部使用一个固定大小PACKET_MAX_SIZE的数组作为底层buffer,这里的Packet::back返回的是剩下还没有使用的内存开始地址,grow的作用就是将back后面的nBytes标记为已经使用,同时将back后移:

#define PACKET_MAX_SIZE 1472
/**
 *	All packets look like this. Only the data is actually sent;
 *	the rest is just housekeeping.
 *
 *	@ingroup mercury
 */
class Packet : public ReferenceCount
{
	public:
	Packet();
	~Packet();

	Packet * next()				{ return next_.get(); }
	const Packet * next() const	{ return next_.get(); }

	void chain( Packet * pPacket ) { next_ = pPacket; }
public:	
	char * data() { return data_; }
	const char * data() const { return data_; }

	/// Returns a pointer to the start of the message data.
	const char * body() const { return data_ + HEADER_SIZE; }

	/// Returns a pointer to the end of the message data.
	char * back() { return data_ + msgEndOffset_; }

	int msgEndOffset() const	{ return msgEndOffset_; }
	int bodySize() const		{ return msgEndOffset_ - HEADER_SIZE; }
	int footerSize() const		{ return footerSize_; }
	int totalSize() const		{ return msgEndOffset_ + footerSize_; }

	void msgEndOffset( int offset )		{ msgEndOffset_ = offset; }
	void grow( int nBytes )				{ msgEndOffset_ += nBytes; }
	void shrink( int nBytes )			{ msgEndOffset_ -= nBytes; }

	int freeSpace() const
	{
		return MAX_SIZE -
			RESERVED_FOOTER_SIZE -
			msgEndOffset_ -
			footerSize_ -
			extraFilterSize_;
	}
private:
	/// Packets are linked together in a simple linked list fashion.
	PacketPtr	next_;

	/// This the offset of the end of the headers and message data. It is
	/// temporarily incorrect in two situations: when sending, it is incorrect
	/// in NetworkInterface::send() whilst footers are being written, and when
	/// receiving, it  is incorrect until processOrderedPacket() strips the
	/// fragment footers.
	int			msgEndOffset_;
	/// The variable-length data follows the packet header in memory.
	char			data_[PACKET_MAX_SIZE];
};

然后从这个类型声明可以看出,连续的Packet是使用next_指针相连,组成一个单链表。因此在startPacket的时候,负责使用Packet::chain来将新的Packet拼接到当前pCurrentPacket_的后面,然后将pCurrentPacket_更新为当前新分配的packet:

/**
 *  This method starts a new packet in this bundle.
 */
void UDPBundle::startPacket( Packet * p )
{
	Packet * prevPacket = pCurrentPacket_;

	// Link the new packet into the chain if necessary.
	if (prevPacket)
	{
		prevPacket->chain( p );
	}

	pCurrentPacket_ = p;
	pCurrentPacket_->reserveFilterSpace( extraSize_ );

	pCurrentPacket_->setFlags( 0 );

	pCurrentPacket_->msgEndOffset( Packet::HEADER_SIZE );

	// if we're in the middle of a message start the next chunk here
	msgChunkOffset_ = pCurrentPacket_->msgEndOffset();
}

虽然我们在Packet::data里预留了PACKET_MAX_SIZE个字节,但是其实里面真正可用的比这个小,因为要扣除Packet::HEADER_SIZ, Packet::RESERVED_FOOTER_SIZEFilterSpace等数据。总的来说这里会控制单个Packet的实际数据大小要小于1400,刚好是以太网的常见MTU大小。

如果当前要填充的数据很大,导致单一Packet放不下会怎么办?解决方式是使用多个Packet串联起来,例如这里的添加大量二进制数据的接口addBlob会使用循环来进行可能的多次Packet分配:

/**
 *	This convenience method is used to add a block of memory to this stream.
 */
INLINE
void UDPBundle::addBlob( const void * pBlob, int size )
{
	const char * pCurr = (const char *)pBlob;

	while (size > 0)
	{
		// If there isn't any more space on this packet, force a new one to be
		// allocated to this bundle.
		if (pCurrentPacket_->freeSpace() == 0)
		{
			this->sreserve( 0 );
		}

		int currSize = std::min( size, int( pCurrentPacket_->freeSpace() ) );
		MF_ASSERT( currSize > 0 );

		memcpy( this->qreserve( currSize ), pCurr, currSize );
		size -= currSize;
		pCurr += currSize;
	}
}

了解了Packet是如何填充的之后,我们再来回顾一下每次StartPacket之前都需要先执行的endPacket,其实就是记录一些统计信息:

/**
 *	This method end processing of the current packet, i.e. calculate its
 *	flags, and the correct size including footers.
 *
 *	@param isExtending	True if we are extending the bundle size, false
 *						otherwise (when we are finalising for send).
 */
void UDPBundle::endPacket( bool isExtending )
{
	// If this won't be the last packet, add a reliable order marker
	if (isExtending)
	{
		if (this->isOnExternalChannel())
		{
			// add a partial reliable order if in the middle of a message
			if (msgBeg_ != NULL && msgIsReliable_)
			{
				this->addReliableOrder();
			}

			// add a gap reliable order to mark the end of the packet
			ReliableOrder rgap = { NULL, 0, 0 };
			reliableOrders_.push_back( rgap );
		}
	}

	// if we're in the middle of a message add this chunk
	msgLen_ += pCurrentPacket_->msgEndOffset() - msgChunkOffset_;
	msgChunkOffset_ = uint16( pCurrentPacket_->msgEndOffset() );
}

当一个逻辑消息包Message彻底填充结束的时候,会调用endMessage这个接口。在这个函数里会使用compressLength将当前Message的总长度msgLen_写入到当前消息字节流的开头,这部分空间在StartMessage的时候已经预留:

/**
 * 	This method finalises a message. It is called from a number of places
 *	within Bundle when necessary.
 */
void UDPBundle::endMessage( bool isEarlyCall /* = false */ )
{
	// nothing to do if no message yet
	if (msgBeg_ == NULL)
	{
		MF_ASSERT( pCurrentPacket_->msgEndOffset() == Packet::HEADER_SIZE || 
			hasEndedMsgEarly_ );
		return;
	}

	// add the amt used in this packet to the length
	msgLen_ += pCurrentPacket_->msgEndOffset() - msgChunkOffset_;

	// fill in headers for this msg
	curIE_.compressLength( msgBeg_, msgLen_, this, msgIsRequest_ );

	// record its details if it was reliable
	if (msgIsReliable_)
	{
		if (this->isOnExternalChannel())
		{
			this->addReliableOrder();
		}

		msgIsReliable_ = false;	// for sanity
	}

	msgChunkOffset_ = Packet::Offset( pCurrentPacket_->msgEndOffset() );

	msgBeg_ = NULL;
	msgIsRequest_ = false;

	hasEndedMsgEarly_ = isEarlyCall;
}

这里的compressLength逻辑并不是使用一个固定的uint32_t的空间来填入当前消息包的总长度,而是使用了变长编码。他根据当前RPC接口所携带的长度信息来做这样的处理:

  1. 如果当前RPC是参数大小固定的RPC,则不需要在包开头预留长度字段,无需填充
  2. 如果当前RPC是参数大小可变的RPC,则根据预设的参数最大长度的字节大小lengthParam_来在包开头预留对应的空间,等一个包所有参数彻底填入之后,再在开头预留的空间里将这个长度字段填进入
  3. 如果当前RPC是参数大小可变的RPC,但是当前参数的大小超过了之前预设的lengthParam_所能表示的整数范围,则将预先保留的大小区域全都填充为0xff,然后在bundle的末尾再加入一个int32,写入当前包大小之后,再与第一个packet里的头四个字节做调换,这样长度字段依然在message的开头部分,这部分对应的代码见下:
// If the message length could not fit into a standard length field, we
// need to handle this as a special case.
if (oversize)
{
	// Fill the original length field with ones to indicate the special
	// situation.
	static const int IDENTIFIER_SIZE = sizeof(uint8);
	for (int i = IDENTIFIER_SIZE; i <= lengthParam_; ++i)
	{
		((uint8*)data)[i] = 0xff;
	}

	if (pBundle)
	{
		void * tail = pBundle->reserve( sizeof( int32 ) );
		void * ret = this->specialCompressLength( data, length,
			pBundle->pFirstPacket(), isRequest );
		MF_ASSERT( !ret || tail == ret );
		return ret ? 0 : -1;
	}
	else
	{
		return -1;
	}
}

/**
 *	This method is called by InterfaceElement::compressLength when the amount
 *	of data added to the stream for the message is more than the message's size
 *	field can handle. For example, if lengthParam is 1 and there is at least
 *	255 bytes worth of data added for the message (or 65535 for 2 bytes etc).
 *
 *	To handle this, a 4-byte size is placed at the start of the message
 *	displacing the first four bytes of the message. These are appended to the
 *	end of the message. The original length field is filled with 0xff to
 *	indicate this special situation.
 */
void * InterfaceElement::specialCompressLength( void * data, int length,
		Packet * pPacket, bool isRequest ) const;

对应这样的长度填充函数compressLengthInterfaceElement也提供了对应的长度解析函数expandLength,就是当前填充过程的逆过程:

/**
 * 	This method expands a length from the given header.
 *
 * 	@param data	This is a pointer to a message header.
 *	@param pPacket
 *	@param isRequest
 *
 * 	@return Expanded length.
 */
int InterfaceElement::expandLength( void * data, Packet * pPacket, 
		bool isRequest ) const
{
	switch (lengthStyle_)
	{
	case FIXED_LENGTH_MESSAGE:
		return lengthParam_;
		break;
	case VARIABLE_LENGTH_MESSAGE:
	{
		uint8 *pLen = ((uint8*)data) + sizeof( MessageID );
		uint32 len = 0;

		switch (lengthParam_)
		{
			case 0: len = 0; break;
			case 1: len = *(uint8*)pLen; break;
			case 2:
			{
#if defined( BW_ENFORCE_ALIGNED_ACCESS )
				uint16 len16 = 0;
				memcpy( &len16, pLen, sizeof(uint16) );
				len = BW_NTOHS( len16 );
#else // !defined( BW_ENFORCE_ALIGNED_ACCESS )
				len = BW_NTOHS( *(uint16 *)pLen );
#endif // defined( BW_ENFORCE_ALIGNED_ACCESS )
				break;
			}
			case 3: len = BW_UNPACK3( (const char*)pLen ); break;
			case 4:
			{
#if defined( BW_ENFORCE_ALIGNED_ACCESS )
				uint32 len32;
				memcpy( &len32, pLen, sizeof(uint32) );
				len = BW_NTOHL( len32 );
#else // !defined( BW_ENFORCE_ALIGNED_ACCESS )
				len = BW_NTOHL( *(uint32*)pLen );
#endif // defined( BW_ENFORCE_ALIGNED_ACCESS )
				break;
			}
			default:
				CRITICAL_MSG( "InterfaceElement::expandLength( %s ): "
					"Unhandled variable message length: %d\n",
					this->c_str(), lengthParam_ );
		}

		// If lengthParam_ is 4, a length > 0x80000000 will cause an overflow
		// and a negative value will be returned from this method.
		if ((int)len < 0)
		{
			ERROR_MSG( "Mercury::InterfaceElement::expandLength( %s ): "
				"Overflow in calculating length of variable message!\n",
				this->c_str() );

			return -1;
		}

		// The special case is indicated with the length field set to maximum.
		// i.e. All bits set to 1.
		if (!this->canHandleLength( len ))
		{
			if (!pPacket)
			{
				return -1;
			}
			return this->specialExpandLength( data, pPacket, isRequest );
		}

		return len;
		break;
	}
	default:
		ERROR_MSG( "Mercury::InterfaceElement::expandLength( %s ): "
			"unrecognised length format %d\n",
			this->c_str(), (int)lengthStyle_ );

		break;
	}
	return -1;
}

综上, bundle里的最小组成单元其实是packetpacket之间使用单链表来串联。每个packet的预留大小都是一样的,基本等于ip的最常见MTU 1400。然后每个Message的所有数据都放在packet里,每个Message的开头都会写入一个单字节整数代表当前Message的唯一标识符。在标识符后面就是当前Message的参数总长度信息,这是一个可变字节大小的整数,字节大小由这个Message的类型确定。如果消息太长,则会走上面的oversize逻辑来填入长度信息。

Bundle的发送与接收

Message写入之后并不会立即触发底层的网络发送,网络发送的逻辑由Channel::send托管,一般是每帧末尾被自动调用,也可以手动调用:

/**
 *	This method sends the given bundle on this channel. If no bundle is
 *	supplied, the channel's own bundle will be sent.
 *
 *	@param pBundle 	The bundle to send, or NULL to send the channel's own
 *					bundle.
 */
void Channel::send( Bundle * pBundle /* = NULL */ )
{
	ChannelPtr pChannel( this );

	if (!this->isConnected())
	{
		ERROR_MSG( "Channel::send( %s ): Channel is not connected\n",
			this->c_str() );
		return;
	}

	if (pBundle == NULL)
	{
		pBundle = pBundle_;
	}

	this->doPreFinaliseBundle( *pBundle );

	pBundle->finalise();

	this->networkInterface().addReplyOrdersTo( *pBundle, this );

	this->doSend( *pBundle );

	// Clear the bundle
	if (pBundle == pBundle_)
	{
		this->clearBundle();
	}
	else
	{
		pBundle->clear();
	}

	if (pListener_)
	{
		pListener_->onChannelSend( *this );
	}
}

这里的核心就是doSend函数,内部会做一堆错误检查,最后没有问题的情况下才会调用到networkInterface::send:

/*
 *	Override from Channel.
 */
void UDPChannel::doSend( Bundle & bundleUncast )
{
	MF_ASSERT_DEBUG( dynamic_cast< UDPBundle * >( &bundleUncast ) != NULL );
	UDPBundle * pBundle = static_cast< UDPBundle * >( &bundleUncast );

	// 忽略很多错误处理函数

	// Send the bundle through the network interface as UDP packets
	pNetworkInterface_->send( addr_, *pBundle, this );

	// Update our stats
	++numDataUnitsSent_;
	numBytesSent_ += pBundle->size();

	if (pBundle->isReliable())
	{
		++numReliablePacketsSent_;
	}

	// Channels that do not send regularly are added to a collection to do
	// their resend checking periodically.
	pNetworkInterface_->irregularChannels().addIfNecessary( *this );

	// If the bundle that was just sent was critical, the sequence number of
	// its last packet is the new unackedCriticalSeq_.
	if (pBundle->isCritical())
	{
		unackedCriticalSeq_ =
			pBundle->pFirstPacket()->seq() + pBundle->numDataUnits() - 1;
	}
}

/**
 * 	This method sends a bundle to the given address.
 *
 * 	Note: any pointers you have into the packet may become invalid after this
 * 	call (and whenever a channel calls this too).
 *
 * 	@param address	The address to send to.
 * 	@param bundle	The bundle to send
 *	@param pChannel	The Channel that is sending the bundle.
 *				(even if the bundle is not sent reliably, it is still passed
 *				through the filter associated with the channel).
 */
void NetworkInterface::send( const Address & address,
								UDPBundle & bundle, UDPChannel * pChannel )
{
	pPacketSender_->send( address, bundle, pChannel );
}

这里有一个专门的PacketSender来负责组包并投递到底层的socket里:

/**
 * 	This method sends a bundle to the given address.
 *
 * 	Note: any pointers you have into the packet may become invalid after this
 * 	call (and whenever a channel calls this too).
 *
 * 	@param address	The address to send to.
 * 	@param bundle	The bundle to send
 *	@param pChannel	The Channel that is sending the bundle.
 *				(even if the bundle is not sent reliably, it is still passed
 *				through the filter associated with the channel).
 */
void PacketSender::send( const Address & address,
		UDPBundle & bundle, UDPChannel * pChannel )
{
	MF_ASSERT( address != Address::NONE );
	MF_ASSERT( !pChannel || pChannel->addr() == address );

	MF_ASSERT( !bundle.pChannel() || (bundle.pChannel() == pChannel) );

#if ENABLE_WATCHERS
	sendingStats_.mercuryTimer().start();
#endif // ENABLE_WATCHERS

	if (!bundle.isFinalised())
	{
		// Handle bundles sent off-channel that won't have been finalised by
		// their channels yet.
		bundle.finalise();
		bundle.addReplyOrdersTo( &requestManager_, pChannel );
	}

	// fill in all the footers that are left to us
	Packet * pFirstOverflowPacket = bundle.preparePackets( pChannel,
				seqNumAllocator_, sendingStats_, shouldUseChecksums_ );

	// Finally actually send the darned thing. Do not send overflow packets.
	for (Packet * pPacket = bundle.pFirstPacket();
			pPacket != pFirstOverflowPacket;
			pPacket = pPacket->next())
	{
		this->sendPacket( address, pPacket, pChannel, false );
	}

#if ENABLE_WATCHERS
	sendingStats_.mercuryTimer().stop( 1 );
#endif // ENABLE_WATCHERS

	sendingStats_.numBundlesSent_++;
	sendingStats_.numMessagesSent_ += bundle.numMessages();

	sendingStats_.numReliableMessagesSent_ += bundle.numReliableMessages();
}

在这里我们看到有一个for循环,在循环里会把当前单链表里的Packet依次的调用sendPacket来投入到socket里。但是这里有一个限制,不能单词send调用发送太多数据,因此会使用bundle.preparePackets来计算出最多能发送到的Packet记录为pFirstOverflowPacket,循环遍历的时候遇到这个Packet就结束。同时preparePackets还会将当前Channel的唯一标识符(其实就是EntityID)也加入到Packet里:

/**
 *  This method will write the flags on a packet fitting for one that will ride
 *  on this channel. It will also reserve enough space for the footer.
 */
void UDPChannel::writeFlags( Packet * p )
{
	p->enableFlags( Packet::FLAG_ON_CHANNEL );

	if (this->isIndexed())
	{
		p->enableFlags( Packet::FLAG_INDEXED_CHANNEL );
		p->channelID() = id_;
		p->channelVersion() = version_;
		p->reserveFooter( sizeof( ChannelID ) + sizeof( ChannelVersion ) );
	}
	// 省略很多代码
}

这里的sendPacket还是过于上层,会有很多的统计和中转过滤操作,并最终调用到basicSendWithRetries:

/**
 *	This method sends a packet. No result is returned as it cannot be trusted.
 *	The packet may never get to the other end.
 *
 *	@param address 	The destination address.
 *	@param pPacket	The packet to send.
 *	@param pChannel The channel to be sent on, or NULL if off-channel.
 *	@param isResend If true, this is a resend, otherwise, it is the initial
 *					send.
 */
void PacketSender::sendPacket( const Address & address,
						Packet * pPacket,
						UDPChannel * pChannel, bool isResend )
{
	// 忽略一些无关代码

	// Check if we want artificial loss or latency
	if (!this->rescheduleSend( address, pPacket, pChannel ))
	{
		this->sendRescheduledPacket( address, pPacket, pChannel );
	}
}

/**
 *	This method sends the packet after rescheduling has occurred.
 *
 *	@param address 	The destination address.
 *	@param pPacket 	The packet.
 *	@param pChannel The channel, or NULL if off-channel send.
 */
void PacketSender::sendRescheduledPacket( const Address & address,
						Packet * pPacket,
						UDPChannel * pChannel )
{
	PacketFilterPtr pFilter = pChannel ? pChannel->pFilter() : NULL;

	if (pPacketMonitor_)
	{
		pPacketMonitor_->packetOut( address, *pPacket );
	}

	// Otherwise send as appropriate
	if (pFilter)
	{
		pFilter->send( *this, address, pPacket );
	}
	else
	{
		this->basicSendWithRetries( address, pPacket );
	}
}

这里的basicSendWithRetries负责封装底层socket的发送,真正的往指定地址尝试投递一个包,如果投递失败还负责短暂的重试:

/**
 *	Basic packet sending functionality that retries a few times
 *	if there are transitory errors.
 *
 *	@param addr 	The destination address.
 *	@param pPacket 	The packet to send.
 *
 *	@return 		REASON_SUCCESS on success, otherwise an appropriate
 *					Mercury::Reason.
 */
Reason PacketSender::basicSendWithRetries( const Address & addr,
		Packet * pPacket )
{
	// try sending a few times
	int retries = 0;
	Reason reason;

	while (retries <= 3)
	{
		++retries;
#if ENABLE_WATCHERS
		sendingStats_.systemTimer().start();
#endif // ENABLE_WATCHERS

		reason = this->basicSendSingleTry( addr, pPacket );

#if ENABLE_WATCHERS
		sendingStats_.systemTimer().stop( 1 );
#endif // ENABLE_WATCHERS

		if (reason == REASON_SUCCESS)
			return reason;

		// If we've got an error in the queue simply send it again;
		// we'll pick up the error later.
		if (reason == REASON_NO_SUCH_PORT)
		{
			continue;
		}

		// If the transmit queue is full wait 10ms for it to empty.
		if ((reason == REASON_RESOURCE_UNAVAILABLE) ||
				(reason == REASON_TRANSMIT_QUEUE_FULL))
		{
			// 一些容错代码
			continue;
		}

		// some other error, so don't bother retrying
		break;
	}

	return reason;
}

最后的basicSendSingleTry才有机会接触到裸的socket,将当前packet里的数据发送到指定socket里,至此发送流程终于走完了:

/**
 *	Basic packet sending function that just tries to send once.
 *
 *	@param addr 	The destination address.
 *	@param pPacket 	The packet to send.
 *
 *	@return 		REASON_SUCCESS on success otherwise an appropriate
 *					Mercury::Reason.
 */
Reason PacketSender::basicSendSingleTry( const Address & addr, 
		Packet * pPacket )
{
	int len = socket_.sendto( pPacket->data(), pPacket->totalSize(), 
		addr.port, addr.ip );

	if (len == pPacket->totalSize())
	{
		sendingStats_.numBytesSent_ += len + UDP_OVERHEAD;
		sendingStats_.numPacketsSent_++;

		return REASON_SUCCESS;
	}
	// 省略错误处理代码
}

Packet的发送由PacketSender托管, 类似的Packet的接收也有一个专门的类型PacketReceiver托管。不过收消息逻辑上层还有很多封装,下面就是一个典型的收消息调用栈,:

WorldOfWarplanes.exe!Mercury::PacketReceiver::processFilteredPacket(const Mercury::Address & addr={...}, Mercury::Packet * p=0x243399c0, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4)  Line 347 + 0x11 bytes C++
WorldOfWarplanes.exe!Mercury::PacketFilter::recv(Mercury::PacketReceiver & receiver={...}, const Mercury::Address & addr={...}, Mercury::Packet * pPacket=0x243399c0, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4)  Line 38 C++
WorldOfWarplanes.exe!Mercury::EncryptionFilter::recv(Mercury::PacketReceiver & receiver={...}, const Mercury::Address & addr={...}, Mercury::Packet * pPacket=0x243399c0, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4)  Line 233 + 0x1c bytes C++
WorldOfWarplanes.exe!Mercury::PacketReceiver::processPacket(const Mercury::Address & addr={...}, Mercury::Packet * p=0x0493cc40, Mercury::ProcessSocketStatsHelper * pStatsHelper=0x0018f2a4)  Line 242 + 0x25 bytes C++
WorldOfWarplanes.exe!Mercury::PacketReceiver::processSocket(bool expectingPacket=true)  Line 92 C++
WorldOfWarplanes.exe!Mercury::PacketReceiver::handleInputNotification(int fd=684)  Line 51 + 0x9 bytes C++
WorldOfWarplanes.exe!Mercury::SelectPoller::handleInputNotifications(int & countReady=, fd_set & readFDs={...}, fd_set & writeFDs={...})  Line 305 + 0x29 bytes C++
WorldOfWarplanes.exe!Mercury::SelectPoller::processPendingEvents(double maxWait=0.00000000000000000)  Line 398 + 0x19 bytes C++
WorldOfWarplanes.exe!Mercury::DispatcherCoupling::doTask()  Line 34 + 0x38 bytes C++
WorldOfWarplanes.exe!Mercury::FrequentTasks::process()  Line 112 C++
WorldOfWarplanes.exe!Mercury::EventDispatcher::processOnce(bool shouldIdle=false)  Line 381 C++
WorldOfWarplanes.exe!ServerConnection::processInput()  Line 922 C++

PacketReceiver::handleInputNotification(int fd=684)上层的逻辑都是一些epoll的封装,这些封装代码我们这里就跳过。当单个socket可以读取数据的时候才会调用到PacketReceiver::handleInputNotification,这里会不断的调用processSocket来接收一个Packet,直到socket里的剩余可读数据无法组成一个Packet:

/*
 *	This method is called when there is data on the socket.
 */
int PacketReceiver::handleInputNotification( int fd )
{
	uint64 processingStartStamps = BW_NAMESPACE timestamp();

	int numPacketsProcessed = 0;

	bool expectingPacket = true; // only true for first call to processSocket()
	bool shouldProcess = true;

	while (shouldProcess)
	{
		Address sourceAddress;
		shouldProcess = this->processSocket( sourceAddress, expectingPacket );
		expectingPacket = false;

		uint64 processingElapsedStamps = 
			BW_NAMESPACE timestamp() - processingStartStamps;

		++numPacketsProcessed;

		// 省略一些容错代码

	}

	return 0;
}

这个processSocket会通过Packet::recvFromEndpoint来尝试从socket里读取数据来填充当前Packet

/**
 *	This method will read and process any pending data on this object's socket.
 *
 *	@param srcAddr 			This will be filled with the source address of any
 *							packets received.
 *	@param expectingPacket 	If true, a packet was expected to be read, 
 *							otherwise false.
 *
 *	@return 				True if a packet was read, otherwise false.
 */
bool PacketReceiver::processSocket( Address & srcAddr, 
		bool expectingPacket )
{
	stats_.updateSocketStats( socket_ );

	// Used to collect stats
	ProcessSocketStatsHelper statsHelper( stats_ );

	// try a recvfrom
	int len = pNextPacket_->recvFromEndpoint( socket_, srcAddr );

	statsHelper.socketReadFinished( len );

	if (len <= 0)
	{
		this->checkSocketErrors( len, expectingPacket );
		return false;
	}

	// process it if it succeeded
	PacketPtr curPacket = pNextPacket_;
	pNextPacket_ = new Packet();

	Reason ret = this->processPacket( srcAddr, curPacket.get(),
			&statsHelper );

	if ((ret != REASON_SUCCESS) &&
			networkInterface_.isVerbose())
	{
		this->dispatcher().errorReporter().reportException( ret, srcAddr );
	}

	return true;
}

Packet::recvFromEndpoint每次尝试读取最多PACKET_MAX_SIZE=1472个字节的数据到内部的读取缓冲区内,这个PACKET_MAX_SIZE也就是当前单一Packet的数据大小上限:

// The default max size for a packet is the MTU of an ethernet frame, minus the
// overhead of IP and UDP headers.  If you have special requirements for packet
// sizes (e.g. your client/server connection is running over VPN) you can edit
// this to whatever you need.
const int Packet::MAX_SIZE = PACKET_MAX_SIZE;

/**
 *  This method does a recv on the endpoint into this packet's data array,
 *  setting the length correctly on a successful receive.  The return value is
 *  the return value from the low-level recv() call.
 */
int Packet::recvFromEndpoint( Endpoint & ep, Address & addr )
{
	int len = ep.recvfrom( data_, MAX_SIZE,
		(u_int16_t*)&addr.port, (u_int32_t*)&addr.ip );

	if (len >= 0)
	{
		this->msgEndOffset( len );
	}

	return len;
}

这里需要重新学习一下UDP的数据接收,他不像TCP那样的字节流接收,而是以包为单位来接收,当一个UDP socket变的可读的时候说明其内部已经接收了一个完整的包。业务层recvfrom接收udp包的时候需要传入bufbufsize,就是接收空间和接收空间大小。如果这个bufsize小于udp包的大小,那么只能接收到这个udp包的前bufsize个字节,剩下的部分会被直接被丢弃,再次执行recvfrom的时候处理的已经是第二个包了。所以bufsize要适配组包时的单Packet大小上限,也就是PACKET_MAX_SIZE

所以当recvFromEndpoint返回的len大于0的时候,就代表接收到了一个Packet。由于我们业务层使用的是Message,所以接下来我们需要尝试从Packet里恢复出Message

/**
 *	This is the entrypoint for new packets, which just gives it to the filter.
 */
Reason PacketReceiver::processPacket( const Address & addr, Packet * p,
	   ProcessSocketStatsHelper * pStatsHelper )
{
	// 跳过一些分支处理

	//parse raw.
	return this->processFilteredPacket( addr, p, pStatsHelper );
}

这个processFilteredPacket的内部逻辑太长了,我们这里就跳过,只需要知道的是这里会根据传入的地址addr与当前PacketReceiver绑定的RPC Interface以及Packet负载的ChannelID来查找对应的Channel pChannel,找到之后执行addToReceiveWindow函数:

// 省略很多代码
// should we be looking in a channel
if (pChannel)
{
	UDPChannel::AddToReceiveWindowResult result =
		pChannel->addToReceiveWindow( p, addr, stats_ );
}
// 省略很多代码

由于udpPacket可能会被乱序接收,所以这里的addToReceiveWindow的作用就是将接受到的Packet按照其编号进行排序,如果当前接收到的Packet的序号与所期待的Packet的序号相匹配,则会在processFilteredPacket里处理一些若干个连续的Packet

Reason oret = REASON_SUCCESS;
PacketPtr pCurrPacket = p;
PacketPtr pNextPacket = NULL;

// push this packet chain (frequently one) through processOrderedPacket

// NOTE: We check isCondemned on the channel and not isDead. If a channel
// has isDestroyed set to true but isCondemned false, we still want to
// process remaining messages. This can occur if there is a message that
// causes the entity to teleport. Any remaining messages are still
// processed and will likely be forwarded from the ghost entity to the
// recently teleported real entity.

// TODO: It would be nice to display a message if the channel is condemned
// but there are messages on it.

while (pCurrPacket &&
	((pChannel == NULL) || !pChannel->isCondemned()))
{
	// processOrderedPacket expects packets not to be chained, since
	// chaining is used for grouping fragments into bundles.  The packet
	// chain we've set up doesn't have anything to do with bundles, so we
	// break the packet linkage before passing the packets into
	// processOrderedPacket.  This can mean that packets that aren't the one
	// just received drop their last reference, hence the need for
	// pCurrPacket and pNextPacket.
	pNextPacket = pCurrPacket->next();
	pCurrPacket->chain( NULL );

	// Make sure they are actually packets with consecutive sequences.
	MF_ASSERT( pNextPacket.get() == NULL || 
		seqMask( pCurrPacket->seq() + 1 ) == 
			pNextPacket->seq() );

	// At this point, the only footers left on the packet should be the
	// request and fragment footers.
	Reason ret = this->processOrderedPacket( addr, pCurrPacket.get(),
			pChannel.get(), pStatsHelper );

	if (oret == REASON_SUCCESS)
	{
		oret = ret;
	}

	pCurrPacket = pNextPacket;
}

这里使用while来消耗掉已经接收到的多个连续的Packet,每个Packet都会经过processOrderedPacket的处理。由于单Bundle可能是由多个packet组合而成的,同时一个Packet里可能会有多个Bundle的数据,所以这里使用一个比Packet更小粒度的概念FragmentedBundle来代表单一Bundle里的多个连续区块, 其概念上类似于绑定了一个shared_ptr<Bundle>string_view。然后完整的Bundle就由一个FragmentedBundle的单链表来组成。当单一bundle的所有packet都到达之后,这里会使用UDPBundleProcessor::dispatchMessages来将当前完整的bundle进行消息回调:

/**
 * Process a packet after any ordering guaranteed by reliable channels
 * has been imposed (further ordering guaranteed by fragmented bundles
 * is still to be imposed)
 */
Reason PacketReceiver::processOrderedPacket( const Address & addr, Packet * p,
	UDPChannel * pChannel, ProcessSocketStatsHelper * pStatsHelper )
{
	// 省略很多代码
	// We have a complete packet chain.  We can drop the reference in pChain now
	// since the Bundle owns it.
	UDPBundleProcessor outputBundle( p );
	pChain = NULL;

	Reason reason = outputBundle.dispatchMessages(
			networkInterface_.interfaceTable(),
			addr,
			pChannel,
			networkInterface_,
			pStatsHelper );

	if (reason == REASON_CORRUPTED_PACKET)
	{
		RETURN_FOR_CORRUPTED_PACKET();
	}
}

由于一个bundle里可能会有多个Message,所以在这个dispatchMessages里会使用一个循环来处理内部存储的所有消息:

/**
 *	This method is responsible for dispatching the messages on this bundle to
 *	the appropriate handlers.
 *
 *	@param interfaceTable 	The interface table.
 *	@param addr 			The source address of the bundle.
 *	@param pChannel 		The channel.
 *	@param networkInterface The network interface.
 *	@param pStatsHelper 	The socket receive statistics.
 *
 *	@return 				REASON_SUCCESS on success, otherwise an appropriate
 *							Mercury::Reason describing the error.
 */
Reason UDPBundleProcessor::dispatchMessages( InterfaceTable & interfaceTable,
		const Address & addr, UDPChannel * pChannel,
		NetworkInterface & networkInterface, 
		ProcessSocketStatsHelper * pStatsHelper ) const
{
#	define SOURCE_STR (pChannel ? pChannel->c_str() : addr.c_str())
	bool breakLoop = pChannel ? pChannel->isDead() : false;
	Reason ret = REASON_SUCCESS;

	// NOTE: The channel may be destroyed while processing the messages so we
	// need to hold a local reference to keep pChannel valid. 
	ChannelPtr pChannelHolder = pChannel;
	MessageFilterPtr pMessageFilter =
		pChannel ? pChannel->pMessageFilter() : NULL;

	// now we simply iterate over the messages in that bundle
	iterator iter	= this->begin();
	iterator end	= this->end();

	interfaceTable.onBundleStarted( pChannel );

	while (iter != end && !breakLoop)
	{
		// find out what this message looks like
		InterfaceElementWithStats & ie = interfaceTable[ iter.msgID() ];
		// 省略一些代码代码
		InterfaceElement updatedIE = ie;
		if (!updatedIE.updateLengthDetails( networkInterface, addr ))
		{
			ERROR_MSG( "UDPBundleProcessor::dispatchMessages( %s ): "
					"Discarding bundle after failure to update length "
					"details for message ID %hu\n",
				SOURCE_STR, (unsigned short int)iter.msgID() );
			ret = REASON_CORRUPTED_PACKET;
			break;
		}
		// get the details out of it
		UnpackedMessageHeader & header = iter.unpack( updatedIE );
		// 省略一些代码
	}
	// 省略很多代码
}

在处理单个消息的解析的时候,首先要做的就是获取当前消息的参数长度,这个逻辑在iter.unpack函数中:

/**
 *	This method unpacks the current message using the given
 *	interface element.
 *
 *	@param ie	InterfaceElement for the current message.
 *
 *	@return		Header describing the current message.
 */
UnpackedMessageHeader & UDPBundleProcessor::iterator::unpack( 
		const InterfaceElement & ie )
{
	uint16	msgBeg = offset_;

	MF_ASSERT( !isUnpacked_ );

	bool isRequest = (nextRequestOffset_ == offset_);

	updatedIE_ = ie;
	// read the standard header
	if (int(offset_) + updatedIE_.headerSize() > int(bodyEndOffset_))
	{
		ERROR_MSG( "UDPBundleProcessor::iterator::unpack( %s ): "
				"Not enough data on stream at %hu for header "
				"(%d bytes, needed %d)\n",
			updatedIE_.name(), offset_, int(bodyEndOffset_) - int(offset_),
			updatedIE_.headerSize() );

		goto errorNoRevert;
	}

	curHeader_.identifier = this->msgID();
	curHeader_.length =
		updatedIE_.expandLength( cursor_->data() + msgBeg, cursor_, isRequest );
	// 省略很多代码
}

UDPBundleProcessor::iterator::unpack我们终于见到了之前介绍过的compressLength的逆过程expandLength,在执行完成expandlength之后,当前消息的总长度就存储在curHeader_.length里。有了总长度之后,就可以方便的知道后续哪些packet里有当前message的消息,当所有packet都收集完成了之后,下面的这个函数就会将分散在各个packet里的参数数据进行合并,成为一个单一的连续buffer:

/**
 * 	This method returns the data for the message that the iterator
 * 	is currently pointing to.
 *
 * 	@return 	Pointer to message data.
 */
const char * UDPBundleProcessor::iterator::data()
{
	// does this message go off the end of the packet?
	if (dataOffset_ + dataLength_ <= bodyEndOffset_)
	{
		// no, ok, we're safe
		return cursor_->data() + dataOffset_;
	}

	// is there another packet? assert that there is because 'unpack' would have
	// flagged an error if the next packet was required but missing
	MF_ASSERT( cursor_->next() != NULL );
	if (cursor_->next() == NULL) return NULL;
	// also assert that data does not start mid-way into the next packet
	MF_ASSERT( dataOffset_ <= bodyEndOffset_ );	// (also implied by 'unpack')

	// is the entirety of the message data on the next packet?
	if (dataOffset_ == bodyEndOffset_ &&
		Packet::HEADER_SIZE + dataLength_ <= cursor_->next()->msgEndOffset())
	{
		// yes, easy then
		return cursor_->next()->body();
	}

	// ok, it's half here and half there, time to make a temporary buffer.
	// note that a better idea might be to return a stream from this function.

	if (dataBuffer_ != NULL)
	{
		// Already created a buffer for it.  
		return dataBuffer_;
	}

	// Buffer is destroyed in operator++() and in ~iterator().
	dataBuffer_ = new char[dataLength_];
	Packet *thisPack = cursor_;
	uint16 thisOff = dataOffset_;
	uint16 thisLen;
	for (int len = 0; len < dataLength_; len += thisLen)
	{
		if (thisPack == NULL)
		{
			DEBUG_MSG( "UDPBundleProcessor::iterator::data: "
				"Run out of packets after %d of %d bytes put in temp\n",
				len, dataLength_ );
			return NULL;
		}
		thisLen = thisPack->msgEndOffset() - thisOff;
		if (thisLen > dataLength_ - len) thisLen = dataLength_ - len;
		memcpy( dataBuffer_ + len, thisPack->data() + thisOff, thisLen );
		thisPack = thisPack->next();
		thisOff = Packet::HEADER_SIZE;
	}
	return dataBuffer_;
}

这里的dataBuffer_就是最终的参数数据的连续buffer,通过这个连续buffer可以构造BinaryIStream,通过operator>>的方式来解析出原来的所有参数。

Channel与可靠UDP

在本书的开头章节中我们介绍过UDP不是一个业务层能直接使用的协议,因为UDP数据包在发送、传输、接收过程中有各种理由能导致数据丢失和乱序。为了能让业务层能够相信网络功能与TCP一样能够提供可靠有序的消息消息收发,网络框架这一层必须提供一个可靠UDP的实现,来抹平底层的网络协议差异,典型样例便是KCP协议。在BigWorld中,业务通信主要使用的都是UDP,因此其网络层提供了一个自己实现的可靠UDP, 叫做UDPChannel:

/**
 *	Channels are used for regular communication channels between two address.
 *
 *	@note Any time you call 'bundle' you may get a different bundle to the one
 *	you got last time, because the Channel decided that the bundle was full
 *	enough to send. This does not occur on high latency channels (or else
 *	tracking numbers would get very confusing).
 *
 *	@note If you use more than one Channel on the same address, they share the
 *	same bundle. This means that:
 *
 *	@li Messages (and message sequences where used) must be complete between
 *		calls to 'bundle' (necessary due to note above anyway)
 *
 *	@li Each channel must say send before the bundle is actually sent.
 *
 *	@li Bundle tracking does not work with multiple channels; only the last
 *		Channel to call 'send' receives a non-zero tracking number (or possibly
 *		none if deleting a Channel causes it to be sent), and only the first
 *		Channel on that address receives the 'bundleLost' call.
 *
 * 	@ingroup mercury
 */
class UDPChannel : public Channel
{
	// 省略很多代码
};

UDPChannel的父类Channel就是一个逻辑上的数据收发通道,这个Channel类负责提供基于Bundle的发送接收的虚接口。Channel有一个另外的子类TCPChannel来处理基于TCP的数据收发,由于TCP已经是一个可靠协议,所以TCPChannel的逻辑并不多,直接把Bundle加上长度字段即可塞入到发送队列中。而UDPChannel则需要在底层将一个Bundle的数据拆分为多个Packet,并依次发送到消息的对端,因为如果单个消息包太大超过以太网的常见MTU的话,会导致拆包,从而增大了丢包和乱序的风险。UDPChannel对端接收数据的时候也需要使用UDPChannel,只有两端都是UDPChannel的时候才能使用框架提供的能力来实现可靠传输。接下来我们来大致的分析一下这个UDPChannel是如何实现可靠传输的。

UDP上做可靠传输,首先需要为发出的每个包分配一个连续自增的编号,在Packet上也提供了这个字段:

/// Sequence number, or Channel::SEQ_NULL if not set
SeqNum		seq_;

这个字段在Packet刚被创建的时候是默认为0的,只有在PacketSender::send执行的时候才会通过UDPBundle::preparePackets来执行填充:


/**
 * 	This method sends a bundle to the given address.
 *
 * 	Note: any pointers you have into the packet may become invalid after this
 * 	call (and whenever a channel calls this too).
 *
 * 	@param address	The address to send to.
 * 	@param bundle	The bundle to send
 *	@param pChannel	The Channel that is sending the bundle.
 *				(even if the bundle is not sent reliably, it is still passed
 *				through the filter associated with the channel).
 */
void PacketSender::send( const Address & address,
		UDPBundle & bundle, UDPChannel * pChannel )
{
	MF_ASSERT( address != Address::NONE );
	MF_ASSERT( !pChannel || pChannel->addr() == address );

	MF_ASSERT( !bundle.pChannel() || (bundle.pChannel() == pChannel) );

#if ENABLE_WATCHERS
	sendingStats_.mercuryTimer().start();
#endif // ENABLE_WATCHERS

	if (!bundle.isFinalised())
	{
		// Handle bundles sent off-channel that won't have been finalised by
		// their channels yet.
		bundle.finalise();
		bundle.addReplyOrdersTo( &requestManager_, pChannel );
	}

	// fill in all the footers that are left to us
	Packet * pFirstOverflowPacket = bundle.preparePackets( pChannel,
				seqNumAllocator_, sendingStats_, shouldUseChecksums_ );

	// 省略后续的往socket投递的逻辑
}

这个函数有点长,目前我们先关注关于序列号分配部分的逻辑:

/**
 *	This method prepares packets this bundle for sending.
 *
 *	@param pChannel			The channel, or NULL for off-channel sending.
 *	@param seqNumAllocator 	The network interface's sequence number allocator,
 *							used for off-channel sending.
 *	@param sendingStats 	The sending stats to update.
 */
Packet * UDPBundle::preparePackets( UDPChannel * pChannel,
		SeqNumAllocator & seqNumAllocator,
		SendingStats & sendingStats,
		bool shouldUseChecksums )
{
	// fill in all the footers that are left to us
	Packet * pFirstOverflowPacket = NULL;

	int	numPackets = this->numDataUnits();
	SeqNum firstSeq = 0;
	SeqNum lastSeq = 0;

	// Write footers for each packet.
	for (Packet * pPacket = this->pFirstPacket();
			pPacket;
			pPacket = pPacket->next())
	{
		MF_ASSERT( pPacket->msgEndOffset() >= Packet::HEADER_SIZE );

		// 省略很多代码

		this->writeFlags( pPacket );

		if (pChannel)
		{
			pChannel->writeFlags( pPacket );
		}

		if ((pChannel && pChannel->isExternal()) ||  
			pPacket->hasFlags( Packet::FLAG_IS_RELIABLE ) || 
			pPacket->hasFlags( Packet::FLAG_IS_FRAGMENT )) 
		{ 
			pPacket->reserveFooter( sizeof( SeqNum ) ); 
			pPacket->enableFlags( Packet::FLAG_HAS_SEQUENCE_NUMBER ); 
		} 

		// At this point, pPacket->back() is positioned just after the message
		// data, so we advance it to the end of where the footers end, then
		// write backwards towards the message data. We check that we finish
		// up back at the message data as a sanity check.
		const int msgEndOffset = pPacket->msgEndOffset();
		pPacket->grow( pPacket->footerSize() );

		
		// 省略很多代码
		// Add the sequence number
		if (pPacket->hasFlags( Packet::FLAG_HAS_SEQUENCE_NUMBER ))
		{
			// If we're sending reliable traffic on a channel, use the
			// channel's sequence numbers.  Otherwise use the nub's.
			pPacket->seq() =
				(pChannel && pPacket->hasFlags( Packet::FLAG_IS_RELIABLE )) ?
					pChannel->useNextSequenceID() :
					seqNumAllocator.getNext();
			
			pPacket->packFooter( pPacket->seq() );

			if (pPacket == pFirstPacket_)
			{
				firstSeq = pPacket->seq();
				lastSeq = pPacket->seq() + numPackets - 1;
			}
		}

		// 省略很多代码
	}

	return pFirstOverflowPacket;
}

这个preparePackets内部用循环去处理当前Bundle里的每一个Packet。如果当前Bundle是一个需要执行可靠收发的Bundle,则内部的所有Packet都会有Packet::FLAG_IS_RELIABLE这个可靠性的flag,此时会通过reserveFooter在当前Packet的末尾预留四个字节大小的长度来等待后续的序列号填充,并顺带的会开启Packet::FLAG_HAS_SEQUENCE_NUMBER这个需要携带序列号的标记位。这里的reserveFooter并不会分配额外的动态内存,而是累计到当前footerSize_上,作为当前packetfooter部分的总大小:

void reserveFooter( int nBytes ) { footerSize_ += nBytes; }
int footerSize() const		{ return footerSize_; }

当所有的footer都被统计了之后,会使用Packet::grow来在原有的数据后面进行内存边界扩张:

void grow( int nBytes )				{ msgEndOffset_ += nBytes; }

这里grow的时候不需要担心msgEndOffset_这个偏移量会超过内部data_数组的最大容量,因为Packet会预先留一个最大可能的RESERVED_FOOTER_SIZE字节不被填参的时候占用,统计当前Packet可用内存的时候会预先对这块区域做扣除:

/// The amount of space that is reserved for fixed-length footers on a
/// packet.  This is done so that the bundle logic can always assume that
/// these footers will fit and not have to worry about pre-allocating them.
/// This is currently 27 bytes, roughly 1.5% of the capacity of a packet, so
/// there's not too much wastage.
static const int RESERVED_FOOTER_SIZE =
	sizeof( Offset ) + // FLAG_HAS_REQUESTS
	sizeof( AckCount ) + // FLAG_HAS_ACKS
	sizeof( SeqNum ) + // FLAG_HAS_SEQUENCE_NUMBER
	sizeof( SeqNum ) * 2 + // FLAG_IS_FRAGMENT
	sizeof( ChannelID ) + sizeof( ChannelVersion ) + // FLAG_INDEXED_CHANNEL
	sizeof( Checksum ); // FLAG_HAS_CHECKSUM

int freeSpace() const
{
	return MAX_SIZE -
		RESERVED_FOOTER_SIZE -
		msgEndOffset_ -
		footerSize_ -
		extraFilterSize_;
}

然后使用的时候,使用专门的接口packFooter,这个接口会将传入数据的字节优先从data_的末尾进行填充,所以当所有的footer都填充完成之后,msgEndOffset_指向的就是真正数据的最大偏移,在这个msgEndOffset_之后的footerSize_字节对应的数据全都是footer对应的数据:


/**
	*  This method writes a footer to the back of this packet.  It should only
	*  be called from NetworkInterface::send() and assumes that size_ has been
	*  artificially increased so that it points to the end of the footers, the
	*  idea being that we work back towards the real body end.
	*/
template <class TYPE>
void packFooter( TYPE value )
{
	msgEndOffset_ -= sizeof( TYPE );

	switch( sizeof( TYPE ) )
	{
		case sizeof( uint8 ):
			*(TYPE*)this->back() = value; break;

		case sizeof( uint16 ):
			*(TYPE*)this->back() = BW_HTONS( value ); break;

		case sizeof( uint32 ):
			*(TYPE*)this->back() = BW_HTONL( value ); break;

		default:
			CRITICAL_MSG( "Footers of size %" PRIzu " aren't supported",
				sizeof( TYPE ) );
	}
}

preparePackets后面的逻辑里发现当前packet拥有Packet::FLAG_HAS_SEQUENCE_NUMBER这个标记位的时候,就会生成一个递增序列号并使用packFooter将这个序列号填充到之前预留的位置上。这里生成递增序列号有两个途径,正常情况下我们的RPC对应的packet都是Packet::FLAG_IS_RELIABLE 的,因此会走UDPChannel::useNextSequenceID:

/**
 * 	This method returns the next sequence ID, and then increments it.
 *
 * 	@return The next sequence ID.
 */
SeqNum UDPChannel::useNextSequenceID()
{
	SeqNum	retSeq = largeOutSeqAt_.getNext();

	if (this->isInternal())
	{
		int usage = this->sendWindowUsage();
		int & threshold = this->sendWindowWarnThreshold();

		if (usage > threshold)
		{
			WARNING_MSG( "UDPChannel::useNextSequenceID( %s ): "
							"Send window backlog is now %d packets, "
							"exceeded previous max of %d, "
							"critical size is %u\n",
						this->c_str(), usage, threshold, windowSize_ );

			threshold = usage;
		}

		if (this->isIndexed() &&
				(s_pSendWindowCallback_ != NULL) &&
				(usage > s_sendWindowCallbackThreshold_))
		{
			(*s_pSendWindowCallback_)( *this );
		}
	}

	return retSeq;
}

这里的largeOutSeqAt_UDPChannel内的一个成员变量,作为递增序列号发生器来使用。在这里生成下一个递增序列号的时候,会顺带的检查一下现在的发送窗口里有多少个Packet还没有发送出去,如果数值太大则会有一个警告,同时会触发一个s_pSendWindowCallback_的回调。

preparePackets填充好每个Packet的递增序列号之后,会使用addResendTimer对每个需要可靠发送的包注册为可以消息重传的Packet:

// set up the reliable machinery
if (pPacket->hasFlags( Packet::FLAG_IS_RELIABLE ))
{
	if (pChannel)
	{
		const ReliableOrder *roBeg, *roEnd;

		if (pChannel->isInternal())
		{
			roBeg = roEnd = NULL;
		}
		else
		{
			this->reliableOrders( pPacket, roBeg, roEnd );
		}

		if (!pChannel->addResendTimer( pPacket->seq(), pPacket, 
				roBeg, roEnd ))
		{
			if (pFirstOverflowPacket == NULL)
			{
				pFirstOverflowPacket = pPacket;
			}
			// return REASON_WINDOW_OVERFLOW;
		}
		else
		{
			MF_ASSERT( pFirstOverflowPacket == NULL );
		}
	}
}

在这个addResendTimer中,会为每个要发送的Packet构造一个UnackedPacket,同时记录一下现在的时间,并将这个UnackedPacket放到当前UDPChannelunackedPackets_数组里去,

/**
 *	This method records a packet that may need to be resent later if it is not
 *	acknowledged. It is called when a packet is sent on our behalf.
 *
 *	@return false if the window size was exceeded.
 */
bool UDPChannel::addResendTimer( SeqNum seq, Packet * p,
		const ReliableOrder * roBeg, const ReliableOrder * roEnd )
{
	MF_ASSERT( (oldestUnackedSeq_ == SEQ_NULL) ||
			unackedPackets_[ oldestUnackedSeq_ ] );
	MF_ASSERT( seq == p->seq() );

	UnackedPacket * pUnackedPacket = new UnackedPacket( p );

	// If this channel has no unacked packets, record this as the oldest.
	if (oldestUnackedSeq_ == SEQ_NULL)
	{
		oldestUnackedSeq_ = seq;
	}

	// Fill it in
	pUnackedPacket->lastSentAtOutSeq_ = seq;

	uint64 now = timestamp();
	pUnackedPacket->lastSentTime_ = now;
	lastReliableSendTime_ = now;

	pUnackedPacket->wasResent_ = false;

	if (roBeg != roEnd)
	{
		pUnackedPacket->reliableOrders_.assign( roBeg, roEnd );
	}

	// Grow the unackedPackets_ array, if necessary.
	if (seqMask( seq - oldestUnackedSeq_ + 1 ) > unackedPackets_.size())
	{
		unackedPackets_.doubleSize( oldestUnackedSeq_ );

		if (this->networkInterface().isVerbose())
		{
			INFO_MSG( "UDPChannel::addResendTimer( %s ): "
					"Doubled send buffer size to %u\n",
				this->c_str(),
				unackedPackets_.size() );
		}
	}

	MF_ASSERT( unackedPackets_[ seq ] == NULL );
	unackedPackets_[ seq ] = pUnackedPacket;

	MF_ASSERT( (oldestUnackedSeq_ == SEQ_NULL) ||
			unackedPackets_[ oldestUnackedSeq_ ] );

	if (seqMask( largeOutSeqAt_ - oldestUnackedSeq_ ) >= windowSize_)
	{
		// Make sure that we at least send occasionally.
		UnackedPacket * pPrevUnackedPacket =
			unackedPackets_[ seqMask( smallOutSeqAt_ - 1 ) ];

		if ((pPrevUnackedPacket == NULL) ||
			(now - pPrevUnackedPacket->lastSentTime_ >
				minInactivityResendDelay_))
		{
			this->sendUnacked( *unackedPackets_[ smallOutSeqAt_ ] );
			smallOutSeqAt_ = seqMask( smallOutSeqAt_ + 1 );
		}

		this->checkOverflowErrors();
		//We shouldn't send now
		return false;
	}
	else
	{
		//We should send now
		smallOutSeqAt_ = largeOutSeqAt_;
		return true;
	}
}

这个函数的后半部分差不多实现了一个类似于TCP里的滑动窗口的概念,oldestUnackedSeq_是当前最小的没有收到对端ACKPacket的序号,而largeOutSeqAt_代表的是当前已发送出去的Packet的最大序号, smallOutSeqAt_代表的是等待发送的最小Packet序号:

uint32			windowSize_;

/// Generally, the sequence number of the next packet to be sent.
SeqNum			smallOutSeqAt_; // This does not include packets in 
								// overflowPackets_

SeqNumAllocator	largeOutSeqAt_; // This does include packets in 
								// overflowPackets_

/// The sequence number of the oldest unacked packet on this channel.
SeqNum			oldestUnackedSeq_;

/// The last time a reliable packet was sent (for the first time) on this
/// channel, as a timestamp.
uint64			lastReliableSendTime_;

如果largeOutSeqAt_oldestUnackedSeq_两者的差值大于等于指定的windowSize_,则代表目前已发送但是没有确认接受的包已经超过了阈值。此时会认为当前包要暂存下来,不能直接发出,等待窗口有余量的时候再发,避免信道拥堵时的进一步加重,所以这里会返回false。在返回之前会判断一下近期一段时间minInactivityResendDelay_内是否有新的包发从出去,如果没有的话则强制发出下一个等待处理的包,并更新smallOutSeqAt_++。这样做的目的是强制推进一下滑动窗口,通知对端当前已经收到的ACK的最大值,类似于一种心跳机制。

如果当前已发送但没有ack的包的数量不是很多,则认为当前Packet可以发出,函数返回true。返回之前会更新smallOutSeqAt_为当前已发送出去的Packet的最大序号largeOutSeqAt_。网络状况好的话当前没有未ACKPacket,此时largeOutSeqAt_就是当前新Packet的序列号。

发送端除了处理滑动窗口之外,还需要处理已发送包的ACK, 这部分逻辑是通过函数handleAck来处理的:

/**
 *	This method removes a packet from the collection of packets that have been
 *	sent but not acknowledged. It is called when an acknowledgement to a packet
 *	on this channel is received.
 *
 *  Returns false on error, true otherwise.
 */
bool UDPChannel::handleAck( SeqNum seq )
{
	MF_ASSERT( (oldestUnackedSeq_ == SEQ_NULL) ||
			unackedPackets_[ oldestUnackedSeq_ ] );

	// Make sure the sequence number is valid
	// 忽略一些容错处理

	// now make sure there's actually a packet there
	UnackedPacket * pUnackedPacket = unackedPackets_[ seq ];
	if (pUnackedPacket == NULL)
	{
		return true;
	}

	// Update the average RTT for this channel, if this packet hadn't already
	// been resent.
	// 忽略一些计算RTT的代码

	// 忽略一些无关代码

	// If we released the oldest unacked packet, figure out the new one
	if (seq == oldestUnackedSeq_)
	{
		oldestUnackedSeq_ = SEQ_NULL;
		for (uint i = seqMask( seq+1 );
			 i != largeOutSeqAt_;
			 i = seqMask( i+1 ))
		{
			if (unackedPackets_[ i ])
			{
				oldestUnackedSeq_ = i;
				break;
			}
		}
	}

	// If the incoming seq is after the last ack, then it is the new last ack
	if (seqLessThan( highestAck_, seq ))
	{
		highestAck_ = seq;
	}

	// Now we can release the unacked packet
	bw_safe_delete( pUnackedPacket );
	unackedPackets_[ seq ] = NULL;

	MF_ASSERT( oldestUnackedSeq_ == SEQ_NULL ||
			unackedPackets_[ oldestUnackedSeq_ ] );

	while (seqMask(smallOutSeqAt_ - oldestUnackedSeq_) < windowSize_ &&
		   unackedPackets_[ smallOutSeqAt_ ])
	{
		this->sendUnacked( *unackedPackets_[ smallOutSeqAt_ ] );
		smallOutSeqAt_ = seqMask( smallOutSeqAt_ + 1 );
	}

	return true;
}

这个函数的逻辑比较简单, 可以分为这四个部分:

  1. 如果当前被ACK的包就是之前没有被ACK的最小序号oldestUnackedSeq_,则通过遍历后续所有已发送包里获取其中第一个未ACK的序号执行oldestUnackedSeq_的更新。这里不直接进行++操作是因为handleAck可能会被乱序执行,即可能先收到5ACK ,再收到4ACK
  2. 更新highestAck_为当前已接受到的最大ACK,注意这里并不代表highestAck_之前的所有包都已经被ACK
  3. 通过bw_safe_delete来释放之前存储的Packet,因为已经被ACK的包不可能再被重传,所以没必要再保留备份
  4. 如果当前的滑动窗口没有被塞满,则可以将下一个等待发送的包smallOutSeqAt_发送出去,同时更新smallOutSeqAt_++

如果对端通过某种机制通知到当前的UDPChannel所有指定序列号endSeq之前的包都已经被顺序接收了,则UDPChannel::handleCumulativeAck会被调用。在这个函数里会获取最小的没有被ack的包序号oldestUnackedSeq_,然后遍历[oldestUnackedSeq_, endSeq)区间内的所有包来调用handleAck

/**
 *	This method handles a cumulative ACK. This indicates that all packets
 *	BEFORE a sequence number have been received by the remote end.
 *
 *  @return False on error, true otherwise.
 */
bool UDPChannel::handleCumulativeAck( SeqNum endSeq )
{
	// Make sure the sequence number is valid
	// 忽略一些容错代码

	if (!this->hasUnackedPackets())
	{
		return true;
	}

	// Check that the ACK is not in the future.
	// Note: endSeq is first seqNum after what's been received.
	// 忽略一些容错代码

	SeqNum seq = oldestUnackedSeq_;

	// Note: Up to but not including endSeq
	while (seqLessThan( seq, endSeq ))
	{
		this->handleAck( seq );
		seq = seqMask( seq + 1 );
	}

	return true;
}

如果一些包长时间没有被ACK,则需要主动的对这些包进行重传,这部分的逻辑在UDPChannel::checkResendTimers函数里,每次发送一个包之前都会被调用:


/**
 *	This method sends the given bundle on this channel. If no bundle is
 *	supplied, the channel's own bundle will be sent.
 *
 *	@param pBundle 	The bundle to send, or NULL to send the channel's own
 *					bundle.
 */
void Channel::send( Bundle * pBundle /* = NULL */ )
{
	// 省略很多代码

	this->doPreFinaliseBundle( *pBundle );

	// 省略很多代码
}
/*
 *	Override from Channel.
 */
void UDPChannel::doPreFinaliseBundle( Bundle & bundle )
{
	// Tack on piggybacks.

	UDPBundle & udpBundle = static_cast< UDPBundle & >( bundle );
	this->checkResendTimers( udpBundle );
}


/**
 *	This method resends any unacked packets as appropriate. This can be because
 *	of time since last sent, receiving later acks before earlier ones.
 */
void UDPChannel::checkResendTimers( UDPBundle & bundle )
{
	// There are no un-acked packets
	if (oldestUnackedSeq_ == SEQ_NULL)
	{
		return;
	}

	// Don't do anything if the remote process has failed
	// 忽略对端下线的情况

	// If we have unacked packets that are getting a bit old, then resend the
	// ones that are older than we'd like.  Anything that has taken more than
	// twice the RTT on the channel to come back is considered to be too old.
	uint64 now = timestamp();
	uint64 resendPeriod =
		std::max( roundTripTime_*2, minInactivityResendDelay_ );
	uint64 lastReliableSendTime = this->lastReliableSendOrResendTime();

	const bool isIrregular = !this->isRemoteRegular();
	const SeqNum endSeq = isIrregular ? smallOutSeqAt_ : highestAck_;
	const bool isDebugVerbose = this->networkInterface().isDebugVerbose();

	int numResends = 0;

	// TODO: 8 is a magic number and would be nice to be more scientific.
	// The idea is to throttle the resends a little in extreme situations. We
	// want to send enough so that no (or not too many) packets are lost but
	// still be able to send more when the RTT is large.
	const int MAX_RESENDS = windowSize_/8;

	for (SeqNum seq = oldestUnackedSeq_;
		seqLessThan( seq, endSeq ) && numResends < MAX_RESENDS;
		seq = seqMask( seq + 1 ))
	{
		UnackedPacket * pUnacked = unackedPackets_[ seq ];

		// Send if the packet is old, or we have a later ack
		if (pUnacked != NULL)
		{
			const bool hasNewerAck =
				 seqLessThan( pUnacked->lastSentAtOutSeq_, highestAck_);

			const bool shouldResend = hasNewerAck ||
				(isIrregular && (now - pUnacked->lastSentTime_ > resendPeriod));

			const SeqNum prevLastSentAtOutSeq = pUnacked->lastSentAtOutSeq_;
			const uint64 prevLastSentTime = pUnacked->lastSentTime_;

			if (shouldResend)
			{
				bool piggybacked = this->resend( seq, bundle );
				++numResends;
				// 忽略一些警告代码
			}
		}
	}
}

在这个函数里会首先计算出当前能够容忍的最长未确认时间resendPeriod,然后遍历当前所有的已发出但未确认的UnackedPacket,如果这些UnackedPacket的发送后时长大于最长未确认时间resendPeriod,则会调用resend将这个Packet重新发送:

/**
 *  Resends an un-acked packet by the most sensible method available.
 *
 *  @return Returns true if the packet is no longer un-acked.
 */
bool UDPChannel::resend( SeqNum seq, UDPBundle & bundle )
{
	++numPacketsResent_;

	UnackedPacket & unacked = *unackedPackets_[ seq ];

	// If possible, piggypack this packet onto the next outgoing bundle
	if (this->isExternal() &&
		!unacked.pPacket_->hasFlags( Packet::FLAG_IS_FRAGMENT ) &&
		(unackedPackets_[ smallOutSeqAt_ ] == NULL)) // Not going to overflow
	{
		if (bundle.piggyback(
				seq, unacked.reliableOrders_, unacked.pPacket_.get() ))
		{
			unacked.wasResent_ = true; // Don't count this for RTT calculations
			this->handleAck( seq );
			return true;
		}
	}

	// If there are any acks on this packet, then they will be resent too, but
	// it does no harm.
	this->sendUnacked( unacked );

	return false;
}

这个resend的最简单的处理逻辑就是执行sendUnacked来重发,记录一下这个包的重发时间戳和重发记录:

/**
 *  Resends an un-acked packet by the most sensible method available.
 */
void UDPChannel::sendUnacked( UnackedPacket & unacked )
{
	unacked.pPacket_->updateChannelVersion( version_, id_ );

	pNetworkInterface_->sendPacket( addr_, unacked.pPacket_.get(), this, 
		/* isResend: */ true );

	unacked.lastSentAtOutSeq_ = smallOutSeqAt_;
	unacked.wasResent_ = true;

	uint64 now = timestamp();
	unacked.lastSentTime_ = now;
	lastReliableResendTime_ = now;
}

如果当前是面向客户端的下行通道,且当前要重发的包包含了完整的一个bundle,同时滑动窗口里允许下一个包进行发送,那么这里会通过piggyback函数将当前Packet连接到当前要发送的bundlePacket链表后面,并执行这个Packet的手动ACK。这样的快速路径相当于将这个要重传的Packet与当前要发送的bundle进行合并了。

至此UDPChannel的发送端实现了序号分配、滑动窗口、接收确认、超时重传等流控和可靠机制,基本等价实现了一个TCP的发送端。接下来我们再来看看UDPChannel的接收端是如何收取Packet并返回ACK的,相关代码在之前我们已经介绍过的PacketReceiver::processFilteredPacket函数里,不过之前只介绍了接收端如何从Packet里解包Bundle的代码,并没有关注ACK的部分。现在我们再来看看这个函数是怎么发出ACK的:

/**
 *	This function has to be very robust, if we intend to use this transport over
 *	the big bad internet. We basically have to assume it'll be complete garbage.
 */
Reason PacketReceiver::processFilteredPacket( const Address & addr,
		Packet * p, ProcessSocketStatsHelper * pStatsHelper )
{
	// 省略很多代码
		// now do something if it's reliable
	if (p->hasFlags( Packet::FLAG_IS_RELIABLE ))
	{
		// first make sure it has a sequence number, so we can address it
		// 省略一些容错代码

		// should we be looking in a channel
		if (pChannel)
		{
			UDPChannel::AddToReceiveWindowResult result =
				pChannel->addToReceiveWindow( p, addr, stats_ );

			if (!pChannel->isLocalRegular())
			{
				shouldSendChannel = true;
			}

			if (result != UDPChannel::PACKET_IS_NEXT_IN_WINDOW)
			{
				// The packet is not corrupted, and has either already been
				// received, or is too early and has been buffered. In either
				// case, we send the ACK immediately, as long as the channel is
				// established and is irregular.
				if (result != UDPChannel::PACKET_IS_CORRUPT)
				{
					if (pChannel->isEstablished() && shouldSendChannel)
					{
						UDPBundle emptyBundle;
						pChannel->send( &emptyBundle );
					}

					return REASON_SUCCESS;
				}

				// The packet has an invalid sequence number.
				else
				{
					RETURN_FOR_CORRUPTED_PACKET();
				}
			}
		}
	}
}

这里会根据addToReceiveWindow的返回之来做处理,如果返回值是PACKET_IS_NEXT_IN_WINDOW,则代表这个Packet刚好就是下一个被期望的序号,否则代表这个包是乱序接收的,此时会构建一个空UDPBundle来发送。这个空UDPBundle的作用就是立即ACK当前包,但是这里面的调用链其实比较晦涩,我们先跳过之前分析过的UDPChannel::send的调用链的具体内容,直接跳转到调用到的UDPBundle::preparePackets

/**
 *	This method prepares packets this bundle for sending.
 *
 *	@param pChannel			The channel, or NULL for off-channel sending.
 *	@param seqNumAllocator 	The network interface's sequence number allocator,
 *							used for off-channel sending.
 *	@param sendingStats 	The sending stats to update.
 */
Packet * UDPBundle::preparePackets( UDPChannel * pChannel,
		SeqNumAllocator & seqNumAllocator,
		SendingStats & sendingStats,
		bool shouldUseChecksums )
{
	// fill in all the footers that are left to us
	Packet * pFirstOverflowPacket = NULL;

	int	numPackets = this->numDataUnits();
	SeqNum firstSeq = 0;
	SeqNum lastSeq = 0;

	// Write footers for each packet.
	for (Packet * pPacket = this->pFirstPacket();
			pPacket;
			pPacket = pPacket->next())
	{
		MF_ASSERT( pPacket->msgEndOffset() >= Packet::HEADER_SIZE );

		if (shouldUseChecksums)
		{
			// Reserve space for the checksum footer

			MF_ASSERT( !pPacket->hasFlags( Packet::FLAG_HAS_CHECKSUM ) );
			pPacket->reserveFooter( sizeof( Packet::Checksum ) );
			pPacket->enableFlags( Packet::FLAG_HAS_CHECKSUM );
		}

		this->writeFlags( pPacket );

		if (pChannel)
		{
			pChannel->writeFlags( pPacket );
		}
		// 省略很多代码
	}
	// 省略很多代码
}

这里的pChannel->writeFlags( pPacket )会在当前Packet还有足够剩余空间的时候将ACK信息携带进去:

/**
 *  This method will write the flags on a packet fitting for one that will ride
 *  on this channel. It will also reserve enough space for the footer.
 */
void UDPChannel::writeFlags( Packet * p )
{
	p->enableFlags( Packet::FLAG_ON_CHANNEL );

	// 忽略无关代码

	// Add a cumulative ACK. This indicates that all packets BEFORE a given seq
	// have been received.
	if (p->freeSpace() >= int( sizeof( SeqNum )))
	{
		p->enableFlags( Packet::FLAG_HAS_CUMULATIVE_ACK );
		p->reserveFooter( sizeof( SeqNum ) );

		Acks::iterator iter = acksToSend_.begin();

		while (iter != acksToSend_.end())
		{
			// Need to go through all due to wrap-around case.
			if (seqLessThan( *iter, inSeqAt_ ) )
			{
				acksToSend_.erase( iter++ );
			}
			else
			{
				++iter;
			}
		}
	}

	
	// 省略很多代码
}

如果当前Packet里剩余空间大于四个字节的话(对于空包来说这个显然成立),这里会优先加入批量ACK的消息,在Packetflags里加上Packet::FLAG_HAS_CUMULATIVE_ACK,然后在Footer里塞入下一个希望收到的连续包的编号inSeqAt_。这个inSeqAt_的更新是在UDPChannel::addToReceiveWindow函数里做的,如果新包的序号等于这个inSeqAt_,则对这个inSeqAt_不断递增,直到已经接受的乱序Packet里没有这个编号:

UDPChannel::AddToReceiveWindowResult UDPChannel::addToReceiveWindow( 
		Packet * p, const Address & srcAddr, PacketReceiverStats & stats )
{
	const SeqNum seq = p->seq();
	const bool isDebugVerbose = this->networkInterface().isDebugVerbose();
	// 省略很多的代码
	// check the good case first
	if (seq == inSeqAt_)
	{
		inSeqAt_ = seqMask( inSeqAt_ + 1 );

		Packet * pPrev = p;
		Packet * pBufferedPacket = bufferedReceives_[ inSeqAt_ ].get();

		// Attach as many buffered packets as possible to this one.
		while (pBufferedPacket != NULL)
		{
			// Link it to the prev packet then remove it from the buffer.
			pPrev->chain( pBufferedPacket );
			bufferedReceives_[ inSeqAt_ ] = NULL;
			--numBufferedReceives_;

			// Advance to the next buffered packet.
			pPrev = pBufferedPacket;
			inSeqAt_ = seqMask( inSeqAt_ + 1 );
			pBufferedPacket = bufferedReceives_[ inSeqAt_ ].get();
		}

		return PACKET_IS_NEXT_IN_WINDOW;
	}
	// 省略很多代码
}

当确定了要发送连续ACK之后,会从已接收但未ACKPacket集合里删除所有序号比inSeqAt_小的,因为已经不再需要保留了。

除了这个连续ACK的加入之外,还需要在剩下的空余空间里塞入一些离散的ACK信息,这部分代码就在UDPChannel::writeFlags处理连续ACK代码的后面:

// Put on as many acks as we can.
if (!acksToSend_.empty() &&
	p->freeSpace() >= int(sizeof( Packet::AckCount ) + sizeof( SeqNum )))
{
	//Required to make GCC link this, something to do with templates
	const size_t MAX_ACKS = Packet::MAX_ACKS;
	p->enableFlags( Packet::FLAG_HAS_ACKS );
	p->reserveFooter( sizeof( Packet::AckCount ) );

	const size_t minSpace = p->freeSpace() / sizeof( SeqNum );
	const size_t minSpaceSize = std::min( minSpace, acksToSend_.size() );
	const size_t nAcks = std::min( minSpaceSize, MAX_ACKS );
	p->nAcks() = static_cast<Packet::AckCount>(nAcks);

	p->reserveFooter( sizeof( SeqNum ) * p->nAcks() );
}

这里会根据剩余空间的大小来算出当前Packet最多还能携带几个离散的ACK,存储在p->nAcks()里。等到真正执行写入的时候,会从acksToSend_里选择p->nAcks()数量的ACK序号塞入到footer里,同时从acksToSend_删除这些元素:

/**
 *  This method will write the appropriate flags on a packet to indicate that
 *  it is on this channel. It must be called after writeFlags.
 */
void UDPChannel::writeFooter( Packet * p )
{
	if (p->hasFlags( Packet::FLAG_INDEXED_CHANNEL ))
	{
		p->packFooter( p->channelID() );
		p->packFooter( p->channelVersion() );
	}

	if (p->hasFlags( Packet::FLAG_HAS_CUMULATIVE_ACK ))
	{
		p->packFooter( inSeqAt_ );
	}

	if (p->hasFlags( Packet::FLAG_HAS_ACKS ))
	{
		// Note: Technically we should start at inSeqAt_ since sequence numbers
		// wrap around but this is rare enough not to worry about (since it
		// still works but is less efficient).
		p->packFooter( (Packet::AckCount)p->nAcks() );
		uint acksAdded = 0;
		while (!acksToSend_.empty() && acksAdded < p->nAcks())
		{
			p->packFooter( *acksToSend_.begin() );
			acksToSend_.erase( acksToSend_.begin() );
			++acksAdded;
		}
	}
}

因为我们的ACK信息是附带在Packet里的,所以这个Packet丢失之后仍然会通过超时重传发送到对端。这里有一点递归的意思,可靠传输依赖于ACK,而ACK的发送又依赖于可靠传输。

UDPChannel接收到一个携带了Packet::FLAG_HAS_CUMULATIVE_ACK或者Packet::FLAG_HAS_ACKSPacket时,就会从footer里解析出这些ACK的包序号,执行之前我们介绍过的handleCumulativeAckhandleAck:

Reason PacketReceiver::processFilteredPacket( const Address & addr,
		Packet * p, ProcessSocketStatsHelper * pStatsHelper )
{
	// 省略很多代码
	f (p->hasFlags( Packet::FLAG_HAS_CUMULATIVE_ACK ))
	{
		if (!pChannel)
		{
			// 省略错误日志
		}

		SeqNum endSeq;

		if (!p->stripFooter( endSeq ))
		{
			// 省略错误日志
		}

		if (!pChannel->handleCumulativeAck( endSeq ))
		{
			RETURN_FOR_CORRUPTED_PACKET();
		}
	}
		// Strip and handle ACKs
	if (p->hasFlags( Packet::FLAG_HAS_ACKS ))
	{
		if (!p->stripFooter( p->nAcks() ))
		{
			// 省略错误日志
		}

		if (p->nAcks() == 0)
		{
			// 省略错误日志
		}

		// The total size of all the ACKs on this packet
		int ackSize = p->nAcks() * sizeof( SeqNum );

		// check that we have enough footers to account for all of the
		// acks the packet claims to have (thanks go to netease)
		if (p->bodySize() < ackSize)
		{
			// 省略错误日志
		}

		// For each ACK that we receive, we no longer need to store the
		// corresponding packet.
		if (pChannel)
		{
			for (uint i=0; i < p->nAcks(); i++)
			{
				SeqNum seq;

				if (!p->stripFooter( seq ))
				{
					// 省略错误日志
				}

				if (!pChannel->handleAck( seq ))
				{
					// 省略错误日志
				}
			}
		}
		// 省略很多代码
	}
	// 省略很多代码
}

到这里,整个ACK的发送和接收机制已经展示完毕,配合超时重传和滑动窗口,当前的UDPChannel基本模拟了可靠TCP