BigWorld的属性同步
属性定义
在Bigworld中,每一种Entity的具体类型都会有一个对应的entitydef文件来描述其所携带的所有属性以及在其身上声明的所有RPC方法,其文本描述格式为xml。在programming/bigworld/examples/client_integration/python/simple/res/scripts/entity_defs/这个文件夹里
提供了一些具体的Entity定义文件的样例,下面就是其中比较简单的Account.def的内容,:
<root>
<Properties>
<playerName>
<Type> STRING </Type>
<Flags> BASE_AND_CLIENT </Flags>
<Persistent> true </Persistent>
<Editable> true </Editable>
</playerName>
<character>
<Type> STRING </Type>
<Flags> BASE </Flags>
<Persistent> true </Persistent>
<Editable> true </Editable>
</character>
</Properties>
<ClientMethods>
<chatMessage>
<Arg> STRING </Arg>
</chatMessage>
</ClientMethods>
<CellMethods>
</CellMethods>
<BaseMethods>
</BaseMethods>
</root>
在这个entitydef文件里,主要有四个Section:
Properties,代表这个Entity类型所承载的属性,这里以Array形式来声明每个属性,每个属性有四个字段:Type这个属性的值类型,常见值类型包括INT,STRING等,支持Array,Map容器类型,同时支持自定义类型,Flags这个属性的可见性的BitMask,主要有三个Bit来组合,BASE代表Base对象中可见,ClIENT代表自身客户端可见,CELL代表Entity可见,ALL_CLIENTS代表AOI内所有客户端可见,Persistent一个Bool值,代表这个属性是否需要存库Editable,一个Bool值,代表这个属性是否可以在运行时修改,
ClientMethods以Array的形式来声明客户端上可以调用的RPC,RPC的每个参数按照顺序来填充ARG字段,可以填多个CellMethods,以Array的形式来声明CellEntity上可以调用的RPC,RPC的每个参数按照顺序来填充ARG字段,可以填多个BaseMethods,以Array的形式来声明CellEntity上可以调用的RPC,RPC的每个参数按照顺序来填充ARG字段,可以填多个
当前我们重点关注Properties字段,在上面的样例文件里声明了两个字符串属性,playerName与character,都比较简单。在同目录下的ClientAvatar.def里属性就比较复杂了,多了很多字段:
<root>
<Volatile>
<position/>
<yaw/>
<pitch> 20 </pitch>
</Volatile>
<Properties>
<playerName>
<Type> STRING </Type>
<Flags> ALL_CLIENTS </Flags>
<Persistent> true </Persistent>
<Editable> true </Editable>
<Identifier> true </Identifier>
</playerName>
<prop1>
<Type> INT32 </Type>
<Flags> BASE_AND_CLIENT </Flags>
<Default> 1 </Default>
</prop1>
<prop2>
<Type> INT16 </Type>
<Flags> ALL_CLIENTS </Flags>
<Default> 2 </Default>
<Persistent> true </Persistent>
</prop2>
<prop3>
<Type> INT8 </Type>
<Flags> OTHER_CLIENTS </Flags>
<Default> 3 </Default>
</prop3>
<prop4>
<Type> INT8 </Type>
<Flags> CELL_PRIVATE </Flags>
<Default> 4 </Default>
</prop4>
<prop5>
<Type> USER_TYPE <implementedBy> TestDataType.instance </implementedBy> </Type>
<Flags> ALL_CLIENTS </Flags>
<Persistent> true </Persistent>
</prop5>
<_timer1>
<Type> INT32 </Type>
<Flags> CELL_PRIVATE </Flags>
</_timer1>
<_timer2>
<Type> INT32 </Type>
<Flags> CELL_PRIVATE </Flags>
</_timer2>
<_timerTick>
<Type> INT32 </Type>
<Flags> CELL_PRIVATE </Flags>
</_timerTick>
</Properties>
</root>
这里最值得注意的是属性的类型并不限制于编程语言里常见的值类型,我们可以使用自己创建的脚本类型。例如prop5这个属性的类型声明为USER_TYPE,代表一个在python脚本里定义的类型,这个自定义类型由TestDataType.instance来实现,下面就是整个TestDataType.py的内容:
import struct
class Test( object ):
def __init__( self, intValue, stringValue, dictValue ):
self.intValue = intValue
self.stringValue = stringValue
self.dictValue = dictValue
def writePascalString( string ):
return struct.pack( "b", len(string) ) + string
def readPascalString( stream ):
(length,) = struct.unpack( "b", stream[0] )
string = stream[1:length+1]
stream = stream[length+1:]
return (string, stream)
class TestDataType:
def addToStream( self, obj ):
if not obj:
obj = self.defaultValue()
stream = struct.pack( "i", obj.intValue )
stream += writePascalString( obj.stringValue )
stream += struct.pack( "i", len( obj.dictValue.keys() ) )
for key in obj.dictValue.keys():
stream += writePascalString( key )
stream += writePascalString( obj.dictValue[key] )
return stream
def createFromStream( self, stream ):
(intValue,) = struct.unpack( "i", stream[:4] )
stream = stream[4:]
stringValue, stream = readPascalString( stream )
dictValue = {}
(dictSize,) = struct.unpack( "i", stream[:4] )
stream = stream[4:]
while len( stream ):
key, stream = readPascalString( stream )
value, stream = readPascalString( stream )
dictValue[key] = value
return Test( intValue, stringValue, dictValue )
def addToSection( self, obj, section ):
if not obj:
obj = self.defaultValue()
section.writeInt( "intValue", obj.intValue )
section.writeString( "stringValue", obj.stringValue )
s = section.createSection( "dictValue" )
for key in obj.dictValue.keys():
v = s.createSection( "value" )
print key, obj.dictValue[key]
v.writeString( "key", key )
v.writeString( "value", obj.dictValue[key] )
def createFromSection( self, section ):
intValue = section.readInt( "intValue" )
if intValue is None:
return self.defaultValue()
stringValue = section.readString( "stringValue" )
dictValue = {}
for value in section["dictValue"].values():
dictValue[value["key"].asString] = value["value"].asString
return Test( intValue, stringValue, dictValue )
def fromStreamToSection( self, stream, section ):
o = self.createFromStream( stream )
self.addToSection( o, section )
def fromSectionToStream( self, section ):
o = self.createFromSection( section )
return self.addToStream( o )
def bindSectionToDB( self, binder ):
binder.bind( "intValue", "INT32" )
binder.bind( "stringValue", "STRING", 50 )
binder.beginTable( "dictValue" )
binder.bind( "key", "STRING", 50 )
binder.bind( "value", "STRING", 50 )
binder.endTable()
def defaultValue( self ):
return Test( 100, "Blah", { "happy": "sad", "naughty": "nice",
"coopers": "carlton" } )
instance = TestDataType()
# TestDataType.py
从这个简单的Python文件内容可以看到,我们需要为脚本提供的数据类型为Test,有三个字段intValue, stringValue, dictValue。但是为了向属性系统注册,需要提供一个额外的属性定义辅助类型TestDataType,通过实现这几个接口来向属性系统注册Test:
bindSectionToDB,这个接口提供当前属性在与数据库进行交互时的类型映射defaultValue,初始情况下这个属性类型的默认值fromStreamToSection,从二进制流里如何解析出当前属性fromSectionToStream,在属性系统里将当前属性序列化为二进制流
整个属性系统其实是CPP提供的基础实现,并加上了一些方便使用者去编辑的属性定义文件。实际在运行时使用Entity的时候,这些属性的声明都会转换为DataDescription这个CPP类型,下面就是这个CPP类型的一些核心变量声明,基本可以对的上XML里和Python里的相关字段:
DataTypePtr pDataType_;
int dataFlags_;
ScriptObject pInitialValue_;
DataSectionPtr pDefaultSection_;
int index_;
int localIndex_; // Index into local prop value vector.
int eventStampIndex_; // Index into time-stamp vector.
int clientServerFullIndex_;
int detailLevel_;
int databaseLength_;
DatabaseIndexingType databaseIndexingType_;
bool hasSetterCallback_;
bool hasNestedSetterCallback_;
bool hasSliceSetterCallback_;
BW::string componentName_;
我们在mosaic_game里属性系统是直接利用声明的CPP结构体类型来作为数据存储容器,并通过libclang来自动生成一些辅助接口来使用的,整体的原理非常简单。但是Bigworld里的属性系统的实现其实非常复杂,他为每个Entity类型构造了一个脚本系统的描述结构EntityDescription,其中属性部分的逻辑都在EntityDescription的父类BaseUserDataObjectDescription里:
/**
* This class is the entity type of a base
*/
class EntityType : public ReferenceCount
{
// 省略很多代码
public:
const EntityDescription & description() const
{ return entityDescription_; }
};
/**
* This class is used to describe a type of entity. It describes all properties
* and methods of an entity type, as well as other information related to
* object instantiation, level-of-detail etc. It is normally created on startup
* when the entities.xml file is parsed.
*
* @ingroup entity
*/
class EntityDescription: public BaseUserDataObjectDescription
{
};
/**
* This class is used to describe a type of User Data Object. It describes all properties
* a chunk item. It is normally created on startup when the user data objects.xml file is parsed.
*
* @ingroup udo
*/
class BaseUserDataObjectDescription
{
public:
BaseUserDataObjectDescription();
virtual ~BaseUserDataObjectDescription();
// TODO: Move this to UserDataObjectDescription
bool parse( const BW::string & name,
DataSectionPtr pSection = NULL );
void addToDictionary( DataSectionPtr pSection, ScriptDict sDict ) const;
BWENTITY_API const BW::string& name() const;
BWENTITY_API unsigned int propertyCount() const;
BWENTITY_API DataDescription* property( unsigned int n ) const;
BWENTITY_API DataDescription* findProperty(
const char * name, const char * component = "" ) const;
DataDescription* findCompoundProperty( const char * name ) const;
protected:
virtual bool parseProperties( DataSectionPtr pProperties,
const BW::string & componentName ) = 0;
virtual bool parseInterface( DataSectionPtr pSection,
const char * interfaceName,
const BW::string & componentName );
virtual bool parseImplements( DataSectionPtr pInterfaces,
const BW::string & componentName );
virtual const BW::string getDefsDir() const = 0;
BW::string name_;
typedef BW::vector< DataDescription > Properties;
Properties properties_;
typedef StringMap< unsigned int > PropertyMap;
PropertyMap & getComponentProperties( const char * component );
#ifdef EDITOR_ENABLED
BW::string editorModel_;
#endif
private:
typedef StringMap< PropertyMap > ComponentPropertyMap;
ComponentPropertyMap propertyMap_;
};
在BaseUserDataObjectDescription里使用数组类型BW::vector< DataDescription > properties_;来存储所有的属性,同时对外提供属性结构的查询接口来隐藏底层的实现。然后properties_里的每个元素DataDescription的类型描述结构DataTypePtr不仅仅可以声明为INT,STRING等基础类型,还可以声明为数组、字典等复杂类型,甚至还能支持在Python里定义结构体。复杂类型又可以嵌套, 这样嵌套起来就能组合成一个非常有表现力的属性元数据描述系统。整个属性系统的实现被封装为了一个lib,放在programming/bigworld/lib/entitydef文件夹下,有兴趣的读者可以去研究一下其完整的实现。
这里我们先忽略掉整个属性系统的完整实现,主要考虑在Entity上对这个属性系统的数据读写操作以及后续的同步。首先需要关注的是当一个Entity从数据库中加载出来时的属性解析处理,这部分的代码在Base::init中:
/**
* This method initialises this base object.
*
* @param pDict The dictionary for the script object or NULL.
* The property values of this dictionary are assumed to be
* of correct types for this entity type.
* @param pCellData A dictionary containing the values to create the cell
* entity with, or NULL.
* @param pLogOnData A string containing a value provided by the billing
* system if this is a Proxy crated by player login, or
* NULL.
*
* @return True if successful, otherwise false.
*/
bool Base::init( PyObject * pDict, PyObject * pCellData,
const BW::string * pLogOnData )
{
MF_ASSERT( !PyErr_Occurred() );
MF_ASSERT( pLogOnData == NULL || this->isProxy() );
// creation of entity delegate
if (!this->initDelegate( /*templateID*/ "" ))
{
ERROR_MSG( "Base::init(%d): Failed to initialise delegate\n", id_);
return false;
}
if (pDict)
{
ScriptDict dict( pDict, ScriptDict::FROM_BORROWED_REFERENCE );
ScriptDict::size_type pos = 0;
// populate entity properties
for (ScriptObject key, value; dict.next( pos, key, value ); )
{
ScriptString propertyName = key.str( ScriptErrorPrint() );
const char * attr = propertyName.c_str();
DataDescription* pDataDescription =
this->pType()->description().findCompoundProperty( attr );
if (pDataDescription &&
pDataDescription->isComponentised() &&
pDataDescription->isBaseData())
{
continue; // skip base components properties
}
if (!this->assignAttribute( propertyName, value, pDataDescription ))
{
ERROR_MSG( "Base::init(%d): Failed to assign '%s'\n",
id_, attr );
Script::printError();
}
}
// populate components properties
if (!populateDelegateWithDict( this->pType()->description(),
pEntityDelegate_.get(), dict, EntityDescription::BASE_DATA ))
{
ERROR_MSG("Base::init(%d): Failed to populate delegate with data\n",
id_);
return false;
}
}
// 省略属性无关逻辑代码
return true;
}
这里的pDict就是属性在Python脚本里解析出来的顶层Dict,有了这个数据Dict和当前Entity的属性结构this->pType()->description()之后,就开始遍历Dict里的每个元素,找到每个元素对应的属性定义,使用assignAttribute来做CPP属性字典的初始化:
/**
* Assigns an attribute, canonicalising the data type if it is a property.
*
* If the value is of incorrect type, a TypeError is raised.
*
* @param attr The name of the property to assign.
* @param value The value of the property to assign.
* @param pDataDescription The data description for the name. If NULL,
* the potential property is looked up from the
* attribute. If no such property exists, a normal
* attribute assignment will be performed.
*
* @return 0 on success, otherwise -1 with the Python error state raised.
*/
bool Base::assignAttribute( const ScriptString & attrObj,
const ScriptObject & value, DataDescription * pDataDescription )
{
if ((pDataDescription == NULL) || !pDataDescription->isBaseData())
{
// 忽略一些异常情况
}
ScriptObject pRealNewValue( value );
pRealNewValue = pDataDescription->dataType()->attach(
pRealNewValue, NULL, 0 );
if (!pRealNewValue)
{
PyErr_Format( PyExc_TypeError,
"%s.%s is a BASE property and must be set to the "
"correct type - %s",
this->pType()->name(),
pDataDescription->name().c_str(),
pDataDescription->dataType()->typeName().c_str() );
return false;
}
return this->PyObjectPlus::pySetAttribute( attrObj, pRealNewValue );
}
在assignAttribute里会使用pDataDescription->dataType()->attach对传入过来的value做属性类型匹配检查,通过检查之后才能真正的设置到属性字典里。
从上面的属性初始化流程可以看出,Bigworld的属性存库数据应该都是由Python脚本序列化的,具体序列化格式完全由Python脚本控制。运行时的属性系统虽然底层是CPP实现的,但是每个属性字段的值类型都是一个PyObject,属性系统框架负责维护属性值类型与属性定义类型的匹配,这样避免在使用CPP公开的属性操作接口时出现错误。
属性修改回调
属性系统除了管理属性的序列化和反序列化之外,最重要的功能其实是属性修改的通知回调,因为很多属性修改之后需要同步到自身客户端、他人客户端等终端设备上。在Entity上提供了下面的两个脚本系统可以调用的属性读写接口:
ScriptObject pyGetAttribute( const ScriptString & attrObj );
bool pySetAttribute( const ScriptString & attrObj,
const ScriptObject & value );
这里我们只关心pySetAttribute这个接口,实现上很简单,根据传入的属性名找到对应的属性字段描述DataDescription,然后利用changeOwnedProperty来执行数据类型判定并修改:
/**
* This method is responsible for setting script attributes associated with
* this object.
*/
bool Entity::pySetAttribute( const ScriptString & attrObj,
const ScriptObject & value )
{
const char * attr = attrObj.c_str();
if (!this->isRealToScript())
{
// 省略非real entity修改属性时的报错
}
// see if it's one of the entity's properties
DataDescription * pDescription = pEntityType_->description( attr,
/*component*/"" );
if (pDescription != NULL && pDescription->isCellData())
{
// 忽略数据库索引字段的处理
int cellIndex = pDescription->localIndex();
ScriptObject pOldValue = properties_[cellIndex];
DataType & dataType = *pDescription->dataType();
if (!propertyOwner_.changeOwnedProperty(
properties_[ cellIndex ],
value, dataType, cellIndex ))
{
// 忽略一些报错代码
}
return true;
}
// 忽略一些代码
}
这里的propertyOwner_是一个比较特殊的成员变量,与properties_同时在Entity里被声明:
typedef BW::vector<ScriptObject> Properties;
Properties properties_;
PropertyOwnerLink<Entity> propertyOwner_;
这个PropertyOwnerLink相当于properties里的一级属性的数组,继承自PropertyOwnerBase, 在PropertyOwnerBase上提供了changeOwnedProperty这个接口来修改一个属性:
/**
* This base class is an object that can own properties.
*/
class PropertyOwnerBase
{
public:
virtual ~PropertyOwnerBase() { }
/**
* This method is called by a child PropertyOwnerBase to inform us that
* a property has changed. Each PropertyOwner should pass this to their
* parent, adding their index to the path, until the Entity is reached.
*
* @return true if succeeded, false if an exception was raised
*/
virtual bool onOwnedPropertyChanged( PropertyChange & change )
{
return true;
}
// 忽略一些虚接口声明
/**
* This method modifies a property owned by this object.
*
* @param rpOldValue A reference that will be populated with the old value.
* @param pNewValue The new value to set the property to.
* @param dataType The type of the property being changed.
* @param index The index of the property being changed.
* @param forceChange If true, the change occurs even if the old and new
* values are the same.
*/
bool changeOwnedProperty( ScriptObject & rpOldValue, ScriptObject pNewValue,
const DataType & dataType, int index,
bool forceChange = false );
};
/**
* This class specialises PropertyOwnerBase to add functionality for top-level
* Property Owners. That is Entity.
*/
class TopLevelPropertyOwner : public PropertyOwnerBase
{
public:
bool setPropertyFromInternalStream( BinaryIStream & stream,
ScriptObject * ppOldValue, ScriptList * ppChangePath,
int rootIndex, bool * pIsSlice );
int setNestedPropertyFromExternalStream(
BinaryIStream & stream, bool isSlice,
ScriptObject * ppOldValue, ScriptList * ppChangePath );
private:
virtual ScriptObject getPyIndex( int index ) const
{
MF_ASSERT( 0 );
return ScriptObject();
}
};
/**
* This is a handy linking class for objects that dislike virtual functions.
*/
template <class C>
class PropertyOwnerLink : public TopLevelPropertyOwner
{
public:
PropertyOwnerLink( C & self ) : self_( self ) { }
virtual bool onOwnedPropertyChanged( PropertyChange & change )
{
return self_.onOwnedPropertyChanged( change );
}
// 省略一些接口声明
private:
C & self_;
};
当changeOwnedProperty被调用到的时候,会判断新值和旧值是否相等,然后再用之前介绍过的attach接口来做新值的类型判定并执行赋值操作:
/**
* This method changes a property owned by this one. It propagates this change
* to the top-level owner.
*/
bool PropertyOwnerBase::changeOwnedProperty( ScriptObject & rpOldValue,
ScriptObject pNewValue, const DataType & dataType, int index,
bool forceChange )
{
if (dataType.canIgnoreAssignment( rpOldValue, pNewValue ))
{
return true;
}
bool changed = forceChange ||
dataType.hasChanged( rpOldValue, pNewValue );
SinglePropertyChange change( index, this->getNumOwnedProperties(), dataType );
PropertyOwnerBase * pTopLevelOwner = NULL;
if (changed)
{
if (!this->getTopLevelOwner( change, pTopLevelOwner ))
{
return false;
}
}
// TODO: attach() should be const
ScriptObject pRealNewValue =
const_cast< DataType & >( dataType ).attach( pNewValue, this, index );
if (!pRealNewValue)
{
return false;
}
const_cast< DataType & >( dataType ).detach( rpOldValue );
rpOldValue = pRealNewValue;
if (pTopLevelOwner != NULL)
{
change.setValue( pRealNewValue );
if (!pTopLevelOwner->onOwnedPropertyChanged( change ))
{
return false;
}
}
return true;
}
当赋值操作attach执行成功之后,会使用一个SinglePropertyChange的结构体来记录这个赋值操作,然后再通过onOwnedPropertyChanged这个接口来触发外层的属性修改后回调:
/**
* This method is called by a child PropertyOwnerBase to inform us that
* a property has changed. Each PropertyOwner should pass this to their
* parent, adding their index to the path, until the Entity is reached.
*
* @return true if succeeded, false if an exception was raised
*/
virtual bool onOwnedPropertyChanged( PropertyChange & change )
{
return true;
}
注意看这里的注释,可以看出这里定位一个属性的方法也跟我们之前在mosaic_game里设计的属性索引系统一样,每一层属性都可以看作一个数组,每一个属性都会记录自己在上一层属性数组里的索引值。当一个属性被修改之后,构造出来的PropertyChange结构体里会使用数组ChangePath来维护各个层级的索引,每次一个属性被修改之后,其构造的PropertyChange会通过getTopLevelOwner将属性层级一路传递到Entity上,在这传递的过程中addToPath会被调用,path_就会被不断的添加每一层的属性索引:
/**
* This class represents a change to a property of an entity.
*/
class PropertyChange
{
public:
void addToPath( int index, int indexLength )
{
path_.push_back( ChangePath::value_type( index, indexLength ) );
}
protected:
// A sequence of child indexes ordered from the leaf to the root
// (i.e. entity). For example, 3,4,6 would be the 6th property of the
// entity, the 4th "child" of that property and then the 3rd "child".
// E.g. If the 6th property is a list of lists called myList, this refers
// to entity.myList[4][3]
typedef BW::vector< std::pair< int32, int32 > > ChangePath;
ChangePath path_; //< Path to the owner being changed.
};
/**
* This class is a specialised PropertyChange. It represents a single value of
* an entity changing.
*/
class SinglePropertyChange : public PropertyChange
{
};
/**
* One of our properties is telling us it's been changed internally.
*/
bool IntermediatePropertyOwner::getTopLevelOwner( PropertyChange & change,
PropertyOwnerBase *& rpTopLevelOwner )
{
if (this->shouldPropagate())
{
change.isNestedChange( true );
bool result = pOwner_->getTopLevelOwner( change, rpTopLevelOwner );
if (rpTopLevelOwner != pOwner_)
{
change.addToPath( ownerRef_, pOwner_->getNumOwnedProperties() );
}
else
{
change.rootIndex( ownerRef_ );
}
return result;
}
return true;
}
在mosaic_game里我们强制要求了属性层级不能超过7级,且单层属性结构体里的属性成员个数不能超过254,这样就可以将这个属性层级数组压缩为一个八字节的std::uint64_t,相对于这里的vector来说节省了很多空间。
对于PropertyOwnerLink来说,属性修改后的通知回调会调用Entity::onOwnedPropertyChanged:
template <class C>
class PropertyOwnerLink : public TopLevelPropertyOwner
{
public:
PropertyOwnerLink( C & self ) : self_( self ) { }
virtual bool onOwnedPropertyChanged( PropertyChange & change )
{
return self_.onOwnedPropertyChanged( change );
}
};
/**
* This method is called when a property owned by this entity changes. It may
* be a top-level property, a property nested inside an array etc, or even a
* change in a slice of an array.
*/
bool Entity::onOwnedPropertyChanged( PropertyChange & change )
{
const DataDescription * pDescription =
pEntityType_->propIndex( change.rootIndex() );
return this->onOwnedPropertyChanged( pDescription, change );
}
这第二个onOwnedPropertyChanged函数重载就是属性修改后执行网络同步的最核心代码,这个函数的实现有点长,这里我们先简略的展示其大概,具体的内容放在后面的小结中展开介绍:
bool Entity::onOwnedPropertyChanged( const DataDescription * pDescription,
PropertyChange & change )
{
if (!this->isRealToScript())
{
PyErr_Format( PyExc_AttributeError,
"Can't change defined property %s.%s on ghost %d\n",
this->pType()->name(), pDescription->name().c_str(), id_ );
return false;
}
if (this->cell().pReplayData())
{
// 将属性修改推送到录像系统
this->cell().pReplayData()->addEntityProperty( *this, *pDescription,
change );
}
if (pDescription->isGhostedData()) // 如果当前属性需要同步到ghost entity
{
// If the data is for other clients, add an event to our history.
if (pDescription->isOtherClientData())
{
// 如果这个属性是AOI内其他客户端可见 则先在这个分支处理处理一下
}
// Send the new data to all our ghosts
// 然后发送这个属性通知到当前的所有ghost里
}
// If the data is for our own client, add it to our bundle
if (pDescription->isOwnClientData() && pReal_->pWitness() != NULL)
{
// 这个分支处理当前自身客户端可见的属性同步
}
return true;
}
Real-Ghost之间的属性同步
当RealEntity决定需要往周围的一个Cell里创建GhostEntity的时候,会调用Entity::createGhost:
/**
* This method adds a createGhost message to a bundle.
*/
void Entity::createGhost( Mercury::Bundle & bundle )
{
bundle.startMessage( CellAppInterface::createGhost );
bundle << this->cell().space().id();
this->writeGhostDataToStream( bundle );
}
/**
* This method puts the variable state data onto the stream that initGhost
* expects to take off.
*
* @param stream The stream to put the data on.
*/
void Entity::writeGhostDataToStream( BinaryOStream & stream ) const
{
// Note: The id and entityTypeID is not read off by readGhostDataFromStream.
// They are read by Space::createGhost
// Note: Also read by BufferGhostMessage to get numTimesRealOffloaded_.
stream << id_ << this->entityTypeID();
CompressionOStream compressionStream( stream,
pEntityType_->description().internalNetworkCompressionType() );
this->writeGhostDataToStreamInternal( compressionStream );
}
createGhost的时候会先调用writeGhostDataToStream,这个函数负责先创建一个带数据压缩的stream,然后再将这个stream传递到writeGhostDataToStreamInternal来填入相关数据。因为GhostEntity的数据一般比较庞大,使用压缩可以降低传输的数据大小。这里的writeGhostDataToStreamInternal除了会把Base的地址和当前RealEntity的地址加入到序列化数据之外,还会遍历当前的所有属性,获取其中与ghost相关的字段,将这些属性字段添加到stream之中:
/**
* This method is called by writeGhostDataToStream once the decision on
* whether or not to compress has been made.
*/
void Entity::writeGhostDataToStreamInternal( BinaryOStream & stream ) const
{
stream << numTimesRealOffloaded_ << localPosition_ << isOnGround_ <<
lastEventNumber_ << volatileInfo_;
stream << CellApp::instance().interface().address();
stream << baseAddr_;
stream << localDirection_;
propertyEventStamps_.addToStream( stream );
TOKEN_ADD( stream, "GProperties" );
// Do ghosted properties dependent on entity type
//this->pType()->addDataToStream( this, stream, DATA_GHOSTED );
// write our ghost properties to the stream
for (uint32 i = 0; i < pEntityType_->propCountGhost(); ++i)
{
MF_ASSERT( properties_[i] );
DataDescription * pDataDesc = pEntityType_->propIndex( i );
// TODO - implement component properties processing here
MF_ASSERT( !pDataDesc->isComponentised() );
ScriptDataSource source( properties_[i] );
if (!pDataDesc->addToStream( source, stream, false ))
{
CRITICAL_MSG( "Entity::writeGhostDataToStream(%u): "
"Could not write ghost property %s.%s to stream\n",
id_, this->pType()->name(), pDataDesc->name().c_str() );
}
}
TOKEN_ADD( stream, "GController" );
this->writeGhostControllersToStream( stream );
TOKEN_ADD( stream, "GTail" );
stream << periodsWithoutWitness_ << aoiUpdateSchemeID_;
}
这里为了方便区分Ghost可见属性与Real可见属性,EntityType在获取了属性定义文件之后,在其构造函数里会优先将Ghost属性放在属性数组的开头,然后再放Real可见属性:
/**
* Constructor
*
* @param entityDescription Entity data description
* @param pType The Python class that is associated with this entity type.
* This object steals the reference from the caller
*/
EntityType::EntityType( const EntityDescription& entityDescription,
PyTypeObject * pType ) :
entityDescription_( entityDescription ),
pPyType_( pType ),
#if !ENABLE_WATCHERS
detailedPositionDescription_( "detailedPosition" ),
#endif
expectsNearbyEntity_( false )
{
MF_ASSERT( !pType ||
(PyType_Check( pType ) &&
PyObject_IsSubclass( (PyObject *)pType,
(PyObject*)&Entity::s_type_ )) );
propCountGhost_ = 0;
for (uint i = 0; i < entityDescription_.propertyCount(); i++)
{
DataDescription * pDesc = entityDescription_.property( i );
if (!pDesc->isCellData()) continue;
if (pDesc->isComponentised()) continue;
if (pDesc->isGhostedData())
{
propDescs_.insert( propDescs_.begin() + propCountGhost_++, pDesc );
}
else
{
propDescs_.push_back( pDesc );
}
}
// 暂时忽略无关代码
}
在这样的排列下,propDescs_[0:propCountGhost_]部分就全都是Ghost可见属性了,所以遍历Ghost可见属性的时候可以无视后面的元素。
其实这里的GhostEntity就是没有RealEntity的Entity,在这个Entity以GhostEntity的形式被创建的时候,会调用initGhost来读取之前传入的相关数据:
/**
* This method creates a ghost entity in this space. This version of
* createGhost already has the entity's id streamed off.
*/
void Space::createGhost( const EntityID entityID, BinaryIStream & data )
{
//ToDo: remove when load balancing is supported on Delegate types
// of physical spaces
if (IGameDelegate::instance() != NULL) {
ERROR_MSG( "Space::createGhost: "
"Currently not supported by Delegate Physical spaces" );
return;
}
AUTO_SCOPED_PROFILE( "createGhost" );
SCOPED_PROFILE( TRANSIENT_LOAD_PROFILE );
// Build up the Entity structure
EntityTypeID entityTypeID;
data >> entityTypeID;
EntityPtr pNewEntity = this->newEntity( entityID, entityTypeID );
pNewEntity->initGhost( data );
Entity::population().notifyObservers( *pNewEntity );
}
/**
* This method should be called on a newly created entity to make it a ghost
* entity. Either this method or initReal should be called immediately after
* the constructor.
*
* @see initReal
*/
void Entity::initGhost( BinaryIStream & data )
{
static ProfileVal localProfile( "initGhost" );
START_PROFILE( localProfile );
int dataSize = data.remainingLength();
this->createEntityDelegate();
this->readGhostDataFromStream( data );
// 省略后续代码
}
initGhost负责调用readGhostDataFromStream将之前writeGhostDataToStream压入的数据先执行解压缩,然后再通过readGhostDataFromStreamInternal解析出来:
/**
* This method reads ghost data from the input stream. This matches the data
* that was added by writeGhostDataToStream. It is called by initGhost.
*
* @see writeGhostDataToStream
* @see initGhost
*/
void Entity::readGhostDataFromStream( BinaryIStream & data )
{
CompressionIStream compressionStream( data );
this->readGhostDataFromStreamInternal( compressionStream );
}
/**
* This method is called by readGhostDataFromStream once the decision on
* whether or not to uncompress has been made.
*/
void Entity::readGhostDataFromStreamInternal( BinaryIStream & data )
{
// This was streamed on by Entity::writeGhostDataToStream.
data >> numTimesRealOffloaded_ >> localPosition_ >> isOnGround_ >>
lastEventNumber_ >> volatileInfo_;
eventHistory_.lastTrimmedEventNumber( lastEventNumber_ );
globalPosition_ = localPosition_;
// Initialise the structure that stores the time-stamps for when
// clientServer properties were last changed.
propertyEventStamps_.init( pEntityType_->description() );
Mercury::Address realAddr;
data >> realAddr;
pRealChannel_ = CellAppChannels::instance().get( realAddr );
data >> baseAddr_;
data >> localDirection_;
globalDirection_ = localDirection_;
propertyEventStamps_.removeFromStream( data );
TOKEN_CHECK( data, "GProperties" );
// Read in the ghost properties
MF_ASSERT( properties_.size() == pEntityType_->propCountGhost() );
for (uint32 i = 0; i < properties_.size(); ++i)
{
DataDescription & dataDescr = *pEntityType_->propIndex( i );
// TODO - implement component properties processing here
MF_ASSERT( !dataDescr.isComponentised() );
DataType & dt = *dataDescr.dataType();
// read and attach the property
ScriptDataSink sink;
MF_VERIFY( dt.createFromStream( data, sink,
/* isPersistentOnly */ false ) );
ScriptObject value = sink.finalise();
if (!(properties_[i] = dt.attach( value, &propertyOwner_, i )))
{
CRITICAL_MSG( "Entity::initGhost(%u):"
"Error streaming off entity property %u\n", id_, i );
}
}
// 省略后续代码
}
在这个readGhostDataFromStreamInternal也会遍历properties_[0:propCountGhost]这些属性,然后依次的解析之前序列化进入的属性数据。这样就完成了创建GhostEntity时,当前所有的GhostEntity可见属性与RealEntity上的属性的初次同步。
然后在后续的属性修改回调中,如果发现修改的属性是GhostEntity可见的属性,则会将这个属性修改信息以CellAppInterface::ghostedDataUpdate这个RPC从RealEntity同步到所有的GhostEntity上:
bool Entity::onOwnedPropertyChanged( const DataDescription * pDescription,
PropertyChange & change )
{
// 省略开头的一些判断
if (pDescription->isGhostedData())
{
// If the data is for other clients, add an event to our history.
if (pDescription->isOtherClientData())
{
// 省略其他客户端可见属性的相关代码
}
// Send the new data to all our ghosts
RealEntity::Haunts::iterator iter = pReal_->hauntsBegin();
while (iter != pReal_->hauntsEnd())
{
Mercury::Bundle & bundle = iter->bundle();
#if ENABLE_WATCHERS
int oldBundleSize = bundle.size();
#endif
bundle.startMessage( CellAppInterface::ghostedDataUpdate );
bundle << this->id();
bundle << int32( pDescription->index() );
change.addToInternalStream( bundle );
#if ENABLE_WATCHERS
pDescription->stats().countSentToGhosts( bundle.size() - oldBundleSize );
pEntityType_->stats().countSentToGhosts( bundle.size() - oldBundleSize );
#endif
++iter;
}
}
// 省略无关代码
return true;
}
当GhostEntity接收到这个ghostedDataUpdate的消息之后,会首先解析被修改属性的index,然后查找对应的属性描述信息DataDescrition,最后使用setPropertyFromInternalStream这个接口来执行属性回放:
/**
* This method handles a message that is used to change property data on a
* ghost.
*/
void Entity::ghostedDataUpdate( BinaryIStream & data )
{
MF_ASSERT( !this->isReal() );
uint32 dataSize = data.remainingLength();
int32 propertyIndex;
data >> propertyIndex;
const DataDescription * pDescription =
pEntityType_->description().property( propertyIndex );
MF_ASSERT( pDescription != NULL );
if (pDescription->isComponentised())
{
// TODO: Handle component stream
CRITICAL_MSG( "Entity::ghostedDataUpdate: "
"Unable to handle component update\n" );
}
else
{
ScriptObject pOldValue = ScriptObject::none();
ScriptList pChangePath = ScriptList();
bool isSlice = false;
bool success = propertyOwner_.setPropertyFromInternalStream( data,
&pOldValue, &pChangePath,
pDescription->localIndex(),
&isSlice );
pDescription->stats().countReceived( dataSize );
if (!success)
{
ERROR_MSG( "Entity::ghostedDataUpdate: Failed for %s.%s id = %d\n",
pEntityType_->name(),
pDescription->name().c_str(),
id_ );
return;
}
pDescription->callSetterCallback(
ScriptObject( this, ScriptObject::FROM_BORROWED_REFERENCE ),
pOldValue, pChangePath, isSlice );
}
}
属性回放接口setPropertyFromInternalStream会根据这个index来执行setOwnedProperty操作:
/**
* This method sets an owned property from a stream that has been sent within
* the server.
*/
bool TopLevelPropertyOwner::setPropertyFromInternalStream(
BinaryIStream & stream,
ScriptObject * ppOldValue,
ScriptList * ppChangePath,
int rootIndex,
bool * pIsSlice)
{
int8 flags;
stream >> flags;
bool isSlice = (flags & PropertyChange::FLAG_IS_SLICE) != 0;
bool isNested = (flags & PropertyChange::FLAG_IS_NESTED) != 0;
if (pIsSlice)
{
*pIsSlice = isSlice;
}
if (isNested)
{
PropertyChangeReader * pReader = getPropertyChangeReader( isSlice );
return pReader->readSimplePathAndApply( stream,
this->getChildPropertyOwner( rootIndex ),
ppOldValue, ppChangePath );
}
MF_ASSERT( !isSlice );
// See PropertyChangeReader::doApply and SinglePropertyChangeReader::apply()
ScriptObject pOldValue = this->setOwnedProperty( rootIndex, stream );
if (!pOldValue)
{
ERROR_MSG( "TopLevelPropertyOwner::setPropertyFromInternalStream: "
"Old value is NULL\n" );
}
else if (ppOldValue)
{
*ppOldValue = pOldValue;
}
return true;
}
如果当前修改的就是顶层属性,那么就会简单的调用setOwnedProperty来完成属性回放,这个setOwnedProperty又会中转回Entity::setOwnedProperty上:
/**
* This is a handy linking class for objects that dislike virtual functions.
*/
template <class C>
class PropertyOwnerLink : public TopLevelPropertyOwner
{
public:
virtual ScriptObject setOwnedProperty( int ref, BinaryIStream & data )
{
return self_.setOwnedProperty( ref, data );
}
private:
C & self_;
}
/**
* This method is called to change the given property.
*/
ScriptObject Entity::setOwnedProperty( int ref, BinaryIStream & data )
{
DataDescription* pDataDesc = pEntityType_->propIndex( ref );
DataType & dt = *pDataDesc->dataType();
// reconstruct the python value from the stream
ScriptDataSink sink;
if (!dt.createFromStream( data, sink, /* isPersistentOnly */ false ) )
{
return ScriptObject();
}
ScriptObject pNewValue = sink.finalise();
if (!pNewValue)
{
return pNewValue;
}
// detach the old value and attach the new one
ScriptObject & pSlotRef = properties_[ref];
ScriptObject pOldValue = pSlotRef;
if (pSlotRef != pNewValue) // hey, it could happen!
{
dt.detach( pSlotRef );
pSlotRef = dt.attach( pNewValue, &propertyOwner_, ref );
}
return pOldValue;
}
这里createFromStream负责解析数据并验证格式匹配,最后的attach操作将解析出来的脚本对象赋值到要修改的属性上面。
如果修改的是嵌套的属性,则会调用readSimplePathAndApply来处理这个复杂情况:
/**
* This method reads and applies a property change.
*
* @param stream The stream to read from.
* @param pOwner The top-level owner of the property.
* @param ppOldValue If not NULL, this is the old value of the property.
* @param ppChangePath If not NULL, this is the change path to use when
* applying the property update.
*
* @return True on success, false otherwise
*/
bool PropertyChangeReader::readSimplePathAndApply( BinaryIStream & stream,
PropertyOwnerBase * pOwner,
ScriptObject * ppOldValue,
ScriptList * ppChangePath )
{
uint8 size;
stream >> size;
uint8 i = 0;
while ((i < size) && pOwner)
{
int32 index;
stream >> index;
this->updatePath( ppChangePath, pOwner->getPyIndex( index ) );
pOwner = pOwner->getChildPropertyOwner( index );
++i;
}
if (!pOwner)
{
ERROR_MSG( "PropertyChangeReader::readAndApply: "
"pOwner is NULL. %d/%d.\n",
i, size );
return false;
}
this->readExtraBits( stream );
this->updatePath( ppChangePath );
this->doApply( stream, pOwner, ppOldValue, ppChangePath );
return true;
}
其实这个复杂情况也不怎么复杂,就是先读取之前填充的path索引数组,然后不断的调用getChildPropertyOwner来获取下一层被修改的属性,直到最后一层的时候,调用doApply来执行属性赋值:
/**
* This helper method calls the apply virtual method and performs error
* checking.
*/
void PropertyChangeReader::doApply( BinaryIStream & stream,
PropertyOwnerBase * pOwner, ScriptObject * ppOldValue,
ScriptList * ppChangePath )
{
ScriptObject pOldValue = this->apply( stream, pOwner,
ppChangePath ? *ppChangePath : ScriptList() );
if (!pOldValue)
{
ERROR_MSG( "PropertyChangeReader::readAndApply: Old value is NULL\n" );
}
else if (ppOldValue)
{
*ppOldValue = pOldValue;
}
}
/**
* This method applies this property change.
*
* @param The stream containing the new value.
* @param The low-level owner of the property change.
*
* @return The old value.
*/
ScriptObject SinglePropertyChangeReader::apply( BinaryIStream & stream,
PropertyOwnerBase * pOwner, ScriptList pChangePath )
{
if (pChangePath)
{
pChangePath.append( pOwner->getPyIndex( leafIndex_ ) );
}
return pOwner->setOwnedProperty( leafIndex_, stream );
}
兜兜转转,最后还是走到了setOwnedProperty这个我们之前已经介绍过的接口。
综上所属,GhostEntity在创建的时候就带上了RealEntity里Ghost可见属性的最新副本。后续属性在被修改时,如果发现这个属性是Ghost可见的,则会通过ghostedDataUpdate这个RPC将属性修改记录发送到所有的Ghost身上,并执行属性的修改回放。这两个机制配合起来就实现了GhostEntity与RealEntity之间所有Ghost可见属性的实时同步。
Real-Client之间的属性同步
在之前的玩家进入场景流程部分内容中,我们已经提到此时客户端会首先发送一个enableEntities的请求到对应的Proxy。此时Proxy的enableEntities处理函数会通过addCreateBasePlayerToChannelBundle将当前玩家数据的客户端可见部分打包在一起,并以ClientInterface::createBasePlayer这个RPC通知到客户端:
/**
* This method handles a request from the client to enable or disable updates
* from the cell. It forwards this message on to the cell.
*/
void Proxy::enableEntities()
{
DEBUG_MSG( "Proxy::enableEntities(%u)\n", id_ );
// if this is the first we've heard of it, then send the client the props
// it shares with us, call the base script...
if (!basePlayerCreatedOnClient_)
{
this->addCreateBasePlayerToChannelBundle();
this->sendExposedForReplayClientPropertiesToCell();
// 省略无关代码
}
// 省略无关代码
}
/**
* This method adds the createBasePlayer message to the given bundle
*
* It should immediately follow a successful login or full
* entity reset, so the client is never operating without
* a Base Player entity.
* Note: When this method is called,
* Proxy::sendExposedForReplayClientPropertiesToCell() should be called
* together at the same time.
*
* @param bundle The Mercury::Bundle to add the message to
*/
void Proxy::addCreateBasePlayerToChannelBundle()
{
DEBUG_MSG( "Proxy::addCreateBasePlayerToChannelBundle(%u): "
"Creating player on client\n",
id_ );
MF_ASSERT( pClientChannel_ != NULL );
MF_ASSERT( shouldRunCallbackOnEntitiesEnabled_ == false );
MF_ASSERT( basePlayerCreatedOnClient_ == false );
Mercury::Bundle & bundle = pClientChannel_->bundle();
bundle.startMessage( ClientInterface::createBasePlayer );
bundle << id_ << pType_->description().clientIndex();
this->addAttributesToStream( bundle,
EntityDescription::FROM_BASE_TO_CLIENT_DATA );
shouldRunCallbackOnEntitiesEnabled_ = true;
basePlayerCreatedOnClient_ = true;
}
/**
* This method writes attributes of the entity related to the given dataDomains
* to stream.
*/
bool Base::addAttributesToStream( BinaryOStream & stream, int dataDomains )
{
const EntityDescription & entityDesc = this->pType()->description();
ScriptObject self( this, ScriptObject::FROM_BORROWED_REFERENCE );
ScriptDict attrs = createDictWithAllProperties(entityDesc,
self, pEntityDelegate_.get(), dataDomains);
if (!attrs)
{
return false;
}
return entityDesc.addDictionaryToStream( attrs, stream, dataDomains );
}
客户端连接在收到这个createBasePlayer的RPC之后,先在ServerConnection::createBasePlayer里将这个玩家的标识符与Entity类型解析出来,然后再使用onBasePlayerCreate函数来解析所携带的属性数据:
/**
* This method handles a createPlayer call from the base.
*/
void ServerConnection::createBasePlayer( BinaryIStream & stream )
{
// we have new player id
EntityID playerID = NULL_ENTITY_ID;
stream >> playerID;
INFO_MSG( "ServerConnection::createBasePlayer: id %u\n", playerID );
// this is now our player id
id_ = playerID;
EntityTypeID playerType = EntityTypeID(-1);
stream >> playerType;
if (pHandler_)
{ // just get base data here
pHandler_->onBasePlayerCreate( id_, playerType,
stream );
}
}
/*
* Override from ServerMessageHandler.
*/
void BWServerMessageHandler::onBasePlayerCreate( EntityID id,
EntityTypeID entityTypeID, BinaryIStream & data )
{
entities_.handleBasePlayerCreate( id, entityTypeID, data );
}
/**
*
*/
void BWEntities::handleBasePlayerCreate( EntityID id,
EntityTypeID entityTypeID, BinaryIStream & data )
{
MF_ASSERT( !isLocalEntityID( id ) );
BWEntityPtr pEntity = entityFactory_.create( id, entityTypeID,
NULL_SPACE_ID, data, &connection_ );
if (!pEntity)
{
ERROR_MSG( "BWEntities::handleBasePlayerCreate: Failed.\n" );
return;
}
// We should have been totally reset before seeing this
// TODO: Client-only entities might be okay here...
MF_ASSERT( pPlayer_ == NULL );
MF_ASSERT( activeEntities_.empty() );
MF_ASSERT( appPendingEntities_.empty() );
MF_ASSERT( pendingPassengers_.empty() );
pPlayer_ = pEntity;
pPlayer_->triggerOnBecomePlayer();
}
这里的handleBasePlayerCreate会使用entityFactory来创建entityTypeID对应类型的Entity,创建之后再去解析属性部分:
/**
* This method creates the appropriate subclass of BWEntity based on the
* given entity type ID.
*
* @param entityTypeID The entity type ID.
*
* @return A pointer to a new instance of the appropriate subclass of
* BWEntity, or NULL on error.
*/
BWEntity * BWEntityFactory::create( EntityID id, EntityTypeID entityTypeID,
SpaceID spaceID, BinaryIStream & data, BWConnection * pConnection )
{
BWEntity * pNewEntity = this->doCreate( entityTypeID, pConnection );
if (!pNewEntity)
{
ERROR_MSG( "BWEntityFactory::create: Failed for entity type %d\n",
entityTypeID );
return NULL;
}
if (!pNewEntity->init( id, entityTypeID, spaceID, data ))
{
ERROR_MSG( "BWEntityFactory::create: "
"init failed for %d. entityTypeID = %d\n",
id, entityTypeID );
// Delete the entity.
MF_ASSERT( !pNewEntity->isPlayer() );
pNewEntity->destroyNonPlayer();
BWEntityPtr pDeleted( pNewEntity );
return NULL;
}
pNewEntity->createExtensions( entityExtensionFactoryManager_ );
return pNewEntity;
}
属性部分的解析在当前Entity实例的init函数里,这里的spaceID我们传递的是一个空值,因此会走到initBasePlayerFromStream这个分支:
/**
* This method is called to initialise this entity.
*
* @param entityID The ID of the entity.
* @param entityTypeID The entity Type ID of this entity.
* @param spaceID The ID of the space this entity's cell entity
* resides in, or NULL_SPACE_ID if this is a base
* entity.
* @param data The property data stream.
*/
bool BWEntity::init( EntityID entityID, EntityTypeID entityTypeID,
SpaceID spaceID, BinaryIStream & data )
{
entityID_ = entityID;
entityTypeID_ = entityTypeID;
spaceID_ = spaceID;
bool isOkay = (spaceID == NULL_SPACE_ID) ?
this->initBasePlayerFromStream( data ) :
this->initCellEntityFromStream( data );
return isOkay;
}
bool Entity::initBasePlayerFromStream( BW::BinaryIStream & data )
{
return pPyEntity_->initBasePlayerFromStream( data );
}
bool PyEntity::initBasePlayerFromStream( BinaryIStream & stream )
{
PyObject * pNewDict = this->pEntity()->type().newDictionary( stream,
EntityDescription::FROM_BASE_TO_CLIENT_DATA );
// 先省略一些代码
return true;
}
这里的newDictionary会根据当前顶层的属性DataDescription来通过readStreamToDict解析下发的stream数据:
/**
* This function returns a brand new instance of a dictionary associated with
* this entity type. It streams the properties from the input stream.
* This is only used for creating the player entity.
*/
PyObject * EntityType::newDictionary( BW::BinaryIStream & stream,
int dataDomains ) const
{
BW::ScriptDict pDict = BW::ScriptDict::create();
description_.readStreamToDict( stream, dataDomains, pDict );
return pDict.newRef();
}
/**
* This method removes the data on the input stream and sets values on the
* input dictionary.
*/
bool EntityDescription::readStreamToDict( BinaryIStream & stream,
int dataDomains, const ScriptMapping & dict ) const
{
class Visitor : public IDataDescriptionVisitor
{
public:
Visitor( BinaryIStream & stream, const ScriptMapping & map, bool onlyPersistent ) :
stream_( stream ),
map_( map ),
onlyPersistent_( onlyPersistent ) {}
bool visit( const DataDescription & dataDesc )
{
//TRACE_MSG( "EntityDescription::readStreamToDict: Reading "
// "property=%s %s\n", dataDesc.name().c_str(), dataDesc.dataType()->typeName().c_str() );
ScriptDataSink sink;
bool result = dataDesc.createFromStream( stream_, sink,
onlyPersistent_ );
ScriptObject pValue = sink.finalise();
IF_NOT_MF_ASSERT_DEV( result && pValue )
{
ERROR_MSG( "EntityDescription::readStream: "
"Could not create %s from stream.\n",
dataDesc.name().c_str() );
return false;
}
if (!dataDesc.insertItemInto( map_, pValue ))
{
ERROR_MSG( "EntityDescription::readStream: "
"Failed to set %s\n", dataDesc.name().c_str() );
Script::printError();
}
return !stream_.error();
}
private:
BinaryIStream & stream_;
const ScriptMapping & map_;
bool onlyPersistent_;
};
Visitor visitor( stream, dict,
((dataDomains & ONLY_PERSISTENT_DATA) != 0) );
return this->visit( dataDomains, visitor );
}
readStreamToDict的内部实现是使用一个visitor来递归遍历所有标记为EntityDescription::FROM_BASE_TO_CLIENT_DATA的属性,并从对应的stream里通过dataDesc.createFromStream来反序列化出对应的python对象。这里的createFromStream就是我们之前已经介绍过的单个属性的反序列化函数,不像之前Ghost属性解析调用完createFromStream之后都会使用一个attach的接口将数据写入到属性系统里:
DataDescription & dataDescr = *pEntityType_->propIndex( i );
// TODO - implement component properties processing here
MF_ASSERT( !dataDescr.isComponentised() );
DataType & dt = *dataDescr.dataType();
// read and attach the property
ScriptDataSink sink;
MF_VERIFY( dt.createFromStream( data, sink,
/* isPersistentOnly */ false ) );
ScriptObject value = sink.finalise();
if (!(properties_[i] = dt.attach( value, &propertyOwner_, i )))
{
CRITICAL_MSG( "Entity::initGhost(%u):"
"Error streaming off entity property %u\n", id_, i );
}
这里EntityDescription::readStreamToDict在解析出pValue之后会使用dataDesc.insertItemInto来更新,其实就是将解析好的数据以其属性路径作为key塞入到外部传毒的顶层dict里:
/**
*
*/
bool DataDescription::insertItemInto( const ScriptMapping & map,
const ScriptObject & item ) const
{
return map.setItem( this->fullName().c_str(), item, ScriptErrorRetain() );
}
解析出当前stream里属性数据的dict之后,需要将entity创建时的dict进行合并,以填充属性的默认值:
PyObject * pNewDict = this->pEntity()->type().newDictionary( stream,
EntityDescription::FROM_BASE_TO_CLIENT_DATA );
PyObject * pCurrDict = PyObject_GetAttrString( this, "__dict__" );
if ( !pNewDict || !pCurrDict ||
PyDict_Update( pCurrDict, pNewDict ) < 0 )
{
PY_ERROR_CHECK();
return false;
}
if (stream.error())
{
return false;
}
这样在initBasePlayerFromStream里就完成了当前玩家的属性初始化工作。
当自身客户端可见属性被修改时,在其属性修改回调ntity::onOwnedPropertyChanged执行的时候会执行下面的逻辑:
bool Entity::onOwnedPropertyChanged( const DataDescription * pDescription,
PropertyChange & change )
{
// 省略开头的一些判断
// If the data is for our own client, add it to our bundle
if (pDescription->isOwnClientData() && pReal_->pWitness() != NULL)
{
MemoryOStream stream;
int streamSize = 0;
Mercury::MessageID messageID = this->addChangeToExternalStream( change,
stream, *pDescription, &streamSize );
if (!pDescription->checkForOversizeLength( stream.size(), id_ ))
{
return false;
}
g_privateClientStats.trackEvent( pEntityType_->name(),
pDescription->name(), stream.size(), streamSize );
#if ENABLE_WATCHERS
pDescription->stats().countSentToOwnClient( stream.size() );
pEntityType_->stats().countSentToOwnClient( stream.size() );
#endif
pReal_->pWitness()->sendToClient( this->id(), messageID, stream,
streamSize );
}
// 省略无关代码
return true;
}
这里的addChangeToExternalStream负责构建一个RPC将当前属性的路径与最新值下发到客户端:
/**
* This method adds a property change for sending to a client.
*/
Mercury::MessageID Entity::addChangeToExternalStream(
const PropertyChange & change, BinaryOStream & stream,
const DataDescription & dataDescription, int * pStreamSize ) const
{
const ExposedPropertyMessageRange & messageRange =
ClientInterface::Range::entityPropertyRange;
int16 msgID = messageRange.msgIDFromExposedID(
dataDescription.clientServerFullIndex() );
if ((msgID == -1) || change.isNestedChange())
{
const Mercury::InterfaceElement & ie =
change.isSlice() ?
ClientInterface::sliceEntityProperty :
ClientInterface::nestedEntityProperty;
msgID = ie.id();
*pStreamSize = ie.streamSize();
change.addToExternalStream( stream,
dataDescription.clientServerFullIndex(),
pEntityType_->propCountClientServer() );
}
else
{
*pStreamSize = dataDescription.streamSize();
change.addValueToStream( stream );
}
return Mercury::MessageID( msgID );
}
常规来说属性修改的RPC是固定的,但是这里却是不定的,这里的开头有一个比较奇怪的操作messageRange.msgIDFromExposedID来生成指定的msgID:
/**
* This method returns the message id assoicated with an exposed id.
*
* @return The message id of the property change or -1 if the value does
* not fit in this range.
*/
int16 msgIDFromExposedID( int exposedID ) const
{
return (exposedID < this->numSlots()) ?
static_cast<int16>(exposedID + firstMsgID_) : -1;
}
这个接口的作用是把一个暴露给客户端的属性的客户端服务端通用索引dataDescription.clientServerFullIndex()映射为ClientInterface中为该单一属性生成的专用消息的msgID。这个属性修改的msgID会被生成到一个指定范围内,这个范围会在client_interface里被声明:
#define MERCURY_PROPERTY_RANGE_MSG( NAME, RANGE_PORTION ) \
namespace Range \
{ \
ExposedPropertyMessageRange NAME ## Range( NAME.id(), \
gMinder.addRange( NAME, RANGE_PORTION ) ); \
}
MF_CALLBACK_MSG( entityProperty )
MERCURY_PROPERTY_RANGE_MSG( entityProperty, 1 ) /* All of the remaining range */
这个宏展开之后会变为下面的代码:
ExposedPropertyMessageRange entityPropertyRange(entityProperty.id(), gMinder.addRange(entityProperty, 1))
这里的gMinder.addRange负责为entityProperty分配254个独立的MsgId,并批量注册这些MsgId的handle为ServerConnection::entityProperty:
/**
* This method populates the interface with a range of the same interface
* element.
*
* @param ie The InterfaceElement to use.
* @param rangePortion Specifies the proportion of the remaining range to
* consume. A value of x specifies 1/x.
*/
MessageID InterfaceMinder::addRange( const InterfaceElement & ie,
int rangePortion )
{
MF_ASSERT( ie.id() == elements_.back().id() );
const size_t startID = elements_.size();
const size_t endID = startID + (254 - startID)/rangePortion;
while (elements_.size() <= endID)
{
this->add(
ie.name(), ie.lengthStyle(), ie.lengthParam(), ie.pHandler() );
}
return MessageID( endID );
}
Entity::addChangeToExternalStream这样来生成MsgID的目的是区分一些顶层的单一属性的修改,这样的回调逻辑ServerConnection::entityProperty会节省很多判断,因为这里可以直接从header里拿到属性的索引,从而直接反序列化并赋值即可:
/**
* This method handles an entity property update from the server.
*/
void ServerConnection::entityProperty( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & stream )
{
int exposedPropertyID =
ClientInterface::Range::entityPropertyRange.exposedIDFromMsgID(
header.identifier );
pHandler_->onEntityProperty( selectedEntityID_, exposedPropertyID, stream );
}
/*
* Override from ServerMessageHandler.
*/
void BWServerMessageHandler::onEntityProperty( EntityID id, int propertyID,
BinaryIStream & data )
{
entities_.handleEntityProperty( id, propertyID, data );
}
/**
* @see ServerMessageHandler::onEntityProperty
*/
void BWEntities::handleEntityProperty( EntityID entityID, int propertyID,
BinaryIStream & data )
{
BWEntityPtr pEntity = this->findAny( entityID );
if (!pEntity)
{
ERROR_MSG( "BWEntities::handleEntityProperty: "
"No such entity %d. propertyID = %d\n",
entityID, propertyID );
data.finish();
return;
}
bool shouldCallCallback = pEntity->isPlayer() || pEntity->isInWorld();
pEntity->onProperty( propertyID, data, !shouldCallCallback );
}
/**
* This method handles a change to a property of the entity sent
* from the server.
*/
void Entity::onProperty( int propertyID, BinaryIStream & data,
bool isInitialising )
{
BW_GUARD;
MF_ASSERT( !this->isDestroyed() );
SimpleClientEntity::propertyEvent( pPyEntity_, this->type().description(),
propertyID, data, /*shouldUseCallback:*/ !isInitialising );
}
/**
* Update the identified property on the given entity. Returns true if
* the property was found to update.
*/
bool propertyEvent( ScriptObject pEntity, const EntityDescription & edesc,
int propertyID, BinaryIStream & data, bool shouldUseCallback )
{
BW_GUARD;
EntityPropertyOwner king( pEntity, edesc );
ScriptObject pOldValue = king.setOwnedProperty( propertyID, data );
if (!pOldValue)
{
return false;
}
if (shouldUseCallback)
{
const DataDescription * pDataDescription =
edesc.clientServerProperty( propertyID );
MF_ASSERT_DEV( pDataDescription != NULL );
BW::string methodName = "set_" + pDataDescription->name();
Script::call(
PyObject_GetAttrString( pEntity.get(), (char*)methodName.c_str() ),
PyTuple_Pack( 1, pOldValue.get() ),
"Entity::propertyEvent: ",
/*okIfFunctionNull:*/true );
}
return true;
}
如果是嵌套属性修改或者区块属性修改,向下发送的属性更新RPC就会被替换为ClientInterface::sliceEntityProperty 或者ClientInterface::nestedEntityProperty, 数据打包的时候就会将这个字段的完整路径传递进去:
const Mercury::InterfaceElement & ie =
change.isSlice() ?
ClientInterface::sliceEntityProperty :
ClientInterface::nestedEntityProperty;
msgID = ie.id();
*pStreamSize = ie.streamSize();
change.addToExternalStream( stream,
dataDescription.clientServerFullIndex(),
pEntityType_->propCountClientServer() );
对应的客户端的处理则会走更加复杂的分支:
/**
* This method handles a nested entity property update from the server.
*/
void ServerConnection::nestedEntityProperty( BinaryIStream & stream )
{
pHandler_->onNestedEntityProperty( selectedEntityID_, stream, false );
}
/**
* This method handles an update to a slice sent from the server.
*/
void ServerConnection::sliceEntityProperty( BinaryIStream & stream )
{
pHandler_->onNestedEntityProperty( selectedEntityID_, stream, true );
}
/*
* Override from ServerMessageHandler.
*/
void BWServerMessageHandler::onNestedEntityProperty( EntityID id,
BinaryIStream & data, bool isSlice )
{
entities_.handleNestedEntityProperty( id, data, isSlice );
}
/**
* @see ServerMessageHandler::onNestedEntityProperty
*/
void BWEntities::handleNestedEntityProperty( EntityID entityID,
BinaryIStream & data, bool isSlice )
{
BWEntityPtr pEntity = this->findAny( entityID );
if (!pEntity)
{
ERROR_MSG( "BWEntities::handleNestedEntityProperty: "
"No such entity %d. isSlice = %d\n",
entityID, isSlice );
data.finish();
return;
}
bool shouldCallCallback = pEntity->isPlayer() || pEntity->isInWorld();
pEntity->onNestedProperty( data, isSlice, !shouldCallCallback );
}
/**
* This method handles a change to a nested property of the entity sent
* from the server.
*/
void Entity::onNestedProperty( BinaryIStream & data, bool isSlice,
bool isInitialising )
{
BW_GUARD;
MF_ASSERT( !this->isDestroyed() );
SimpleClientEntity::nestedPropertyEvent( pPyEntity_,
this->type().description(), data,
/*shouldUseCallback:*/ !isInitialising, isSlice );
}
/**
* Update the identified property on the given entity. Returns true if
* the property was found to update.
*/
bool nestedPropertyEvent( ScriptObject pEntity, const EntityDescription & edesc,
BinaryIStream & data, bool shouldUseCallback, bool isSlice )
{
BW_GUARD;
EntityPropertyOwner king( pEntity, edesc );
ScriptObject * ppOldValue = NULL;
ScriptList * ppChangePath = NULL;
ScriptObject pOldValue = ScriptObject::none();
ScriptList pChangePath;
if (shouldUseCallback)
{
ppOldValue = &pOldValue;
ppChangePath = &pChangePath;
}
int topLevelIndex = king.setNestedPropertyFromExternalStream( data, isSlice,
ppOldValue, ppChangePath );
// if this was a top-level property then call the set handler for it
if (shouldUseCallback)
{
const DataDescription * pDataDescription =
edesc.clientServerProperty( topLevelIndex );
MF_ASSERT_DEV( pDataDescription != NULL );
pDataDescription->callSetterCallback( pEntity, pOldValue, pChangePath,
isSlice );
}
return true;
}
从这里贴出来的nestedPropertyEvent的完整实现与之前贴出来的propertyEvent完整实现做对比可以看出,最主要的差别就是从EntityPropertyOwner::setOwnedProperty修改为了TopLevelPropertyOwner::setNestedPropertyFromExternalStream。EntityPropertyOwner::setOwnedProperty的实现很简单,找到索引对应的顶层属性描述信息pDD之后执行数据反序列化,并将序列化出来的值通过PyObject_SetAttrString设置到属性系统的dict就结束了:
virtual ScriptObject EntityPropertyOwner::setOwnedProperty( int ref, BinaryIStream & data )
{
BW_GUARD;
const DataDescription * pDD = edesc_.clientServerProperty( ref );
if (pDD == NULL) return ScriptObject();
ScriptDataSink sink;
if (!pDD->createFromStream( data, sink, /* isPersistentOnly */ false ))
{
ERROR_MSG( "Entity::handleProperty: "
"Error streaming off new property value\n" );
return ScriptObject();
}
ScriptObject pNewObj = sink.finalise();
ScriptObject pOldObj(
PyObject_GetAttrString( e_.get(), (char*)pDD->name().c_str() ),
ScriptObject::STEAL_REFERENCE );
if (!pOldObj)
{
PyErr_Clear();
pOldObj = Py_None;
}
int err = PyObject_SetAttrString(
e_.get(), (char*)pDD->name().c_str(), pNewObj.get() );
if (err == -1)
{
ERROR_MSG( "Entity::handleProperty: "
"Failed to set new property into Entity\n" );
PyErr_PrintEx(0);
}
return pOldObj;
}
但是对于setNestedPropertyFromExternalStream,找到对应的属性就需要一个while循环来获取了:
/**
* This method sets an owned property from a stream that has been sent from the
* server to a client.
*/
int TopLevelPropertyOwner::setNestedPropertyFromExternalStream(
BinaryIStream & stream, bool isSlice,
ScriptObject * ppOldValue,
ScriptList * ppChangePath )
{
PropertyChangeReader * pReader = getPropertyChangeReader( isSlice );
return pReader->readCompressedPathAndApply( stream, this,
ppOldValue, ppChangePath );
}
/**
* This method reads and applies a property change.
*
* @param stream The stream to read from.
* @param pOwner The top-level owner of the property.
* @param ppOldValue If not NULL, this is set to the old value of the property.
* @param ppChangePath If not NULL, this is set to the path of the property
* that has changed.
*
* @return The top-level index of the change.
*/
int PropertyChangeReader::readCompressedPathAndApply( BinaryIStream & stream,
PropertyOwnerBase * pOwner,
ScriptObject * ppOldValue,
ScriptList * ppChangePath )
{
int topLevelIndex = -1;
BitReader bits( stream );
while ((bits.get( 1 ) != 0) && pOwner)
{
int numProperties = pOwner->getNumOwnedProperties();
int index = bits.get( BitReader::bitsRequired( numProperties ) );
if (topLevelIndex == -1)
{
topLevelIndex = index;
}
else
{
this->updatePath( ppChangePath, pOwner->getPyIndex( index ) );
}
pOwner = pOwner->getChildPropertyOwner( index );
}
if (!pOwner)
{
ERROR_MSG( "PropertyChangeReader::readAndApply: Invalid path to owner. "
"topLevelIndex = %d\n",
topLevelIndex );
return -1 - std::max( topLevelIndex, 0 );
}
int index = this->readExtraBits( bits, pOwner->getNumOwnedProperties() );
if (topLevelIndex == -1)
{
topLevelIndex = index;
}
else
{
this->updatePath( ppChangePath );
}
this->doApply( stream, pOwner, ppOldValue, ppChangePath );
return topLevelIndex;
}
同时在执行属性更新的时候,为了更好的通知脚本,还需要将属性的完整路径拼接出来一个ScriptList,对比之前的简单顶层属性只需要提供一个属性索引即可。
// if this was a top-level property then call the set handler for it
if (shouldUseCallback)
{
const DataDescription * pDataDescription =
edesc.clientServerProperty( topLevelIndex );
MF_ASSERT_DEV( pDataDescription != NULL );
pDataDescription->callSetterCallback( pEntity, pOldValue, pChangePath,
isSlice );
}
综上,使用ExposedPropertyMessageRange来对顶级属性做简单的索引映射而不是提供完整路径可以加速客户端的属性回放处理。
AOI内的Entity的属性同步
讲解完仅自身客户端可见属性和仅Ghost可见属性的同步之后,剩下的需要同步的属性就是最重要的能被其他客户端可见的属性。注意在目前的属性同步设计之中,如果一个属性被设计为能被其他客户端可见,那么这个属性一定也是Ghost可见的,因为Ghost的作用就是在当前Cell中向周围的其他RealEntity同步其代理的RealEntity的属性变化,因此这个其他客户端可见属性的处理是嵌入到Ghost可见属性判断部分的,只不过在内部加了一个isOtherClientData的分支判定:
bool Entity::onOwnedPropertyChanged( const DataDescription * pDescription,
PropertyChange & change )
{
// 省略开头的一些判断
if (pDescription->isGhostedData())
{
// If the data is for other clients, add an event to our history.
if (pDescription->isOtherClientData())
{
if (pDescription->shouldSendLatestOnly() &&
change.isNestedChange())
{
WARNING_MSG( "Entity::onOwnedPropertyChanged(%u): "
"%s.%s has SendLatestOnly enabled and was partially "
"changed. Sending full property.\n",
id_, this->pType()->name(),
pDescription->name().c_str() );
MF_VERIFY_DEV(
propertyOwner_.changeOwnedProperty(
properties_[ pDescription->localIndex() ],
properties_[ pDescription->localIndex() ],
*pDescription->dataType(),
pDescription->localIndex(),
/* forceChange: */ true ) );
return true;
}
MemoryOStream stream;
int streamSize = 0;
Mercury::MessageID msgID =
this->addChangeToExternalStream( change, stream,
*pDescription, &streamSize );
if (!pDescription->checkForOversizeLength( stream.size(), id_ ))
{
return false;
}
g_publicClientStats.trackEvent( pEntityType_->name(),
pDescription->name(), stream.size(), streamSize );
// Add history event for clients
HistoryEvent * pEvent =
pReal_->addHistoryEvent( msgID, stream,
*pDescription, streamSize, pDescription->detailLevel() );
propertyEventStamps_.set( *pDescription, pEvent->number() );
}
// 省略之前介绍的ghost可见属性的处理
}
// 省略无关逻辑
}
处理其他客户端可见属性的开头就先判断当前属性是否被标记为了应该只发送最新值shouldSendLatestOnly。如果是只发送最新值的字段,但是当前修改的属性是这个字段的一些嵌套的子属性,那么这次并不会只同步这些子属性,而是使用changeOwnedProperty将当前整个属性也全都重新修改一遍来触发完整的同步,并跳过后续的处理。如果不这样将所有子属性都同步过去的话,发送一个局部增量nested change可能会导致客户端/历史重放端得到不完整或错误的状态。
接下来的则是当前普通情况下的其他客户端可见属性的广播流程:
- 先使用
addChangeToExternalStream将当前属性的修改信息打包为一个RPC,这个函数我们之前已经介绍过了,所以不再跟进 - 然后使用
addHistoryEvent将这个属性变化打上一个递增序列号,加入到HistoryEvent队列里, - 将当前属性的最新版本修改为这个生成的
HistoryEvent的序列号里,并记录到propertyEventStamps_这个map里
这里最重要的就是addHistoryEvent函数,我们需要跟进一下这个函数的处理:
/**
* This method adds a message on to the event history.
*/
HistoryEvent * RealEntity::addHistoryEvent( uint8 type,
MemoryOStream & stream,
const MemberDescription & description,
int16 msgStreamSize,
HistoryEvent::Level level )
{
HistoryEvent * pNewEvent =
entity_.addHistoryEventLocally( type, stream, description,
msgStreamSize, level );
// Send to ghosts.
Haunts::iterator iter = haunts_.begin();
while (iter != haunts_.end())
{
Haunt & haunt = *iter;
Mercury::Bundle & bundle = haunt.bundle();
uint32 startLength = bundle.size();
bundle.startMessage( CellAppInterface::ghostHistoryEvent );
bundle << this->entity().id();
pNewEvent->addToStream( bundle );
description.stats().countSentToGhosts( bundle.size() - startLength );
++iter;
}
return pNewEvent;
}
addHistoryEventLocally负责为这次属性变化构造出一个HistoryEvent,然后使用CellAppInterface::ghostHistoryEvent这个RPC将当前的HistoryEvent广播到所有的Ghost上。addHistoryEventLocally函数内部会以这些传入参数构造一个HistoryEvent,并附加一个递增序列号,构造完成之后将这个HistoryEvent添加到内部的一个数组eventHistory的末尾:
/**
* This method adds the given history event to this entity locally.
* @see RealEntity::addHistoryEvent for what you probably want.
*/
HistoryEvent * Entity::addHistoryEventLocally( uint8 type,
MemoryOStream & stream,
const MemberDescription & description, int16 msgStreamSize,
HistoryEvent::Level level )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
#if ENABLE_WATCHERS
pEntityType_->stats().countAddedToHistoryQueue( stream.size() );
#endif
return this->eventHistory().add( this->getNextEventNumber(),
type, stream, description, level, msgStreamSize );
}
实际上这里的eventHistory().add所作的工作比前述的几句多一个sendLatestOnly的判定:
/**
* This method adds a new event to the event history.
*/
HistoryEvent * EventHistory::add( EventNumber eventNumber,
uint8 type, MemoryOStream & stream,
const MemberDescription & description,
HistoryEvent::Level level, int16 msgStreamSize )
{
HistoryEvent * pNewEvent = NULL;
if (description.shouldSendLatestOnly())
{
int latestEventIndex = description.latestEventIndex();
MF_ASSERT( latestEventIndex != -1 );
Container::iterator latestEventIter =
latestEventPointers_[ latestEventIndex ];
if (latestEventIter != container_.end())
{
latestEventPointers_[ latestEventIndex ] = container_.end();
pNewEvent = *latestEventIter;
pNewEvent->recreate( type, eventNumber,
stream.data(), stream.size(),
level, &description, msgStreamSize );
container_.erase( latestEventIter );
}
}
else
{
MF_ASSERT( description.latestEventIndex() == -1 );
}
if (!pNewEvent)
{
pNewEvent = new HistoryEvent( type, eventNumber,
stream.data(), stream.size(), level, &description,
description.latestEventIndex(),
description.isReliable(), msgStreamSize );
}
stream.shouldDelete( false );
#if ENABLE_WATCHERS
description.stats().countAddedToHistoryQueue( stream.size() );
#endif
this->add( pNewEvent );
return pNewEvent;
}
这里会在发现当前的属性是一个只需要发送最新数据的属性时,并不会执行普通逻辑路径上的new HistoryEvent,而是先获取之前记录的这个属性的修改版本号latestEventIndex,然后再从本地的数组latestEventPointers_获取这个版本号对应的最近一次分配的HistoryEvent。如果这个HistoryEvent存在的话,则将这个HistoryEvent里面存储的内容通过recreate接口直接用最新数据全覆盖,并将这个HistoryEvent从container_数组里删除。后续的this->add又会将当前的pNewEvent放到当前container_的末尾。这样的操作下保证这个description在整个container_里自会保留一条记录,每次一个新的记录过来的时候都会将之前的记录销毁。这里执行container_.erase的时候为了避免latestEventPointers_里存储的迭代器失效,container_的底层实现是一个链表:
// Events that have LatestChangeOnly have (at most) a single instance.
typedef BW::vector< Container::iterator > LatestChangePointers;
LatestChangePointers latestEventPointers_;
每次一个其他客户端可见属性被销毁的时候,都有可能创建一个新的元素塞入到这个链表里,随着运行时间的增长这个链表里的元素数量可能会变得很大。为了避免无限增长,CellApp上会使用计时器来定期调用handleTrimHistoriesTimeSlice来遍历每个Entity来清除一些创建时间比较久远的元素:
/**
* This method handles timeout events.
*/
void CellApp::handleTimeout( TimerHandle /*handle*/, void * arg )
{
switch (reinterpret_cast<uintptr>( arg ))
{
case TIMEOUT_GAME_TICK:
this->handleGameTickTimeSlice();
break;
case TIMEOUT_TRIM_HISTORIES:
this->handleTrimHistoriesTimeSlice();
break;
// 省略一些代码
}
}
/**
* This method handles the trim histories time slice.
*
* TODO: This is dodgy. We don't want to trim all of the histories in one go.
*/
void CellApp::handleTrimHistoriesTimeSlice()
{
// Keep iterator in tact. This is necessary as trimEventHistory can call
// onWitnessed (which it probably should not do).
Entity::callbacksPermitted( false );
{
EntityPopulation::const_iterator iter = Entity::population().begin();
while (iter != Entity::population().end())
{
// TODO: Could skip dead entities.
iter->second->trimEventHistory( 0 );
iter++;
}
}
Entity::callbacksPermitted( true );
Entity::population().expireRealChannels();
}
/**
* This method trims the event history associated with this entity. It is called at a
* frequency so that all entities in an AoI would have been visited at least once over the
* period.
*
* This method also calculates whether this entity is no longer being witnessed.
*
* @param cleanUpTime All events with a time less than this should be deleted.
*/
void Entity::trimEventHistory( GameTime /*cleanUpTime*/ )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
// trim the event history
eventHistory_.trim();
// 省略一些无关代码
}
最终会执行到EventHistory::trim操作上,这个函数的作用其实就是记录一下当前容器最后一个元素的序列号到trimToEvent_,同时在更新trimToEvent_之前从链表里删除所有序列号小于上次执行trim后记录的trimToEvent_,总体作用就是只保留上次trim执行之后新加入的元素:
/**
* This method is used to trim the EventHistory. It deletes all of the events
* that were added before the last trim call (leaving only those events added
* since the last trim call).
*
* This method should not be called more frequently than it takes any
* RealEntityWithWitnesses to go through all histories.
*/
void EventHistory::trim()
{
// TODO: This is a bit dodgy because we do not know how often to go through
// this.
while (!container_.empty() &&
container_.front()->number() <= trimToEvent_)
{
if (container_.front()->isReliable())
{
lastTrimmedEventNumber_ = container_.front()->number();
}
this->deleteEvent( container_.front() );
container_.pop_front();
}
trimToEvent_ = container_.empty() ? 0 : container_.back()->number();
}
当Ghost接收到CellAppInterface::ghostHistoryEvent这个RPC之后,会将这个HistoryEvent从参数里解析出来,并放到自身存储的HistoryEvent数组里:
/**
* This method handles a message from the real entity associated with this
* entity. It adds a history event to this ghost.
*/
void Entity::ghostHistoryEvent( BinaryIStream & data )
{
MF_ASSERT( !this->isReal() );
++lastEventNumber_;
EventNumber newNumber = this->eventHistory().addFromStream( data );
MF_ASSERT( newNumber == lastEventNumber_ );
}
/**
* This method adds a history event that has been streamed over the network.
* This is done from the real entity to its ghost entities.
*/
EventNumber EventHistory::addFromStream( BinaryIStream & stream )
{
EventNumber eventNumber;
uint8 type;
HistoryEvent::Level level;
int latestEventIndex;
bool isReliable;
int16 msgStreamSize;
void * historyData;
int length;
HistoryEvent::extractFromStream( stream, eventNumber, type, level,
latestEventIndex, isReliable, msgStreamSize, historyData, length );
HistoryEvent * pNewEvent = NULL;
if (latestEventIndex != -1)
{
Container::iterator latestEventIter =
latestEventPointers_[ latestEventIndex ];
if (latestEventIter != container_.end())
{
latestEventPointers_[ latestEventIndex ] = container_.end();
pNewEvent = *latestEventIter;
pNewEvent->recreate( type, eventNumber, historyData, length,
level, latestEventIndex, msgStreamSize );
container_.erase( latestEventIter );
}
}
if (!pNewEvent)
{
pNewEvent = new HistoryEvent( type, eventNumber, historyData, length,
level, NULL, latestEventIndex, isReliable, msgStreamSize );
}
this->add( pNewEvent );
return pNewEvent->number();
}
从这个EventHistory::addFromStream可以看出,这里的反序列化逻辑基本与之前在RealEntity上的为属性修改新建HistoryEvent时的逻辑一样,看上去并没有执行属性的本地修改记录回放的操作。其实这里处理其他客户端可见属性的逻辑执行完成之后并不会直接返回, 而是会继续执行之前介绍过的GhostEntity可见属性的广播,会再发送一个ghostedDataUpdate消息到所有的Ghost上,从而触发Ghost上的属性回放。
那这个EventHistory既然不参与属性在GhostEntity上的回放,为什么RealEntity上要额外产生一个CellAppInterface::ghostHistoryEvent消息并广播到所有的GhostEntity上呢。答案是EventHistory这个类型是为了往其他客户端执行属性同步而存在的,使用的地方在Entity::writeClientUpdateDataToBundle函数里:
/**
* This method writes any relevant information about this entity that has
* occurred since the last update time to the given bundle. This includes
* changes in volatile position and new history events.
*
* @param bundle The bundle to put the information on.
* @param basePos The reference point for relative positions.
* @param cache The current entity cache.
* @param lodPriority Indicates what level of detail to use.
*
* @return True if a reliable position update message was included in the
* bundle, false otherwise.
*/
bool Entity::writeClientUpdateDataToBundle( Mercury::Bundle & bundle,
const Vector3 & basePos,
EntityCache & cache,
float lodPriority ) const
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
const int initSize = bundle.size();
int oldSize = initSize;
int numEvents = 0;
this->writeVehicleChangeToBundle( bundle, cache );
// Send the appropriate history for this entity
EventHistory::const_reverse_iterator eventIter = eventHistory_.rbegin();
EventHistory::const_reverse_iterator eventEnd = eventHistory_.rend();
// Go back to find the correct place then forward to add to the bundle in
// chronological order.
// TODO: Consider the wrap around case. To wrap around a 32-bit value
// needs 12 events a second for 10 years (24 hours a day).
bool hasEventsToSend = false;
bool hasSelectedEntity = false;
while (eventIter != eventEnd &&
(*eventIter)->number() > cache.lastEventNumber())
{
HistoryEvent & event = **eventIter;
hasEventsToSend = hasEventsToSend ||
event.shouldSend( lodPriority, cache.detailLevel() );
eventIter++;
}
// 先省略后续代码
}
这里决定哪些属性需要同步下去的时候,并不像开始进入AOI时的去遍历属性,而是遍历我们之前提到的eventHistory_,这个eventHistory_是一个队列,存储了所有其他客户端可见属性的修改记录,每个记录都携带了属性索引、最新值和修改版本号等信息。这里会从eventHistory_的末尾开始遍历,直到遇到的属性历史版本号小于等于上次同步后记录的版本号lastEventNumber。这个已同步最大版本号字段记录在EntityCache对象上,这个类型的意义将在后续的AOI相关章节的时候进行介绍。遍历的时候会使用HistoryEvent::shouldSend来过滤掉一些不需要向下发送的HistoryEvent,以节省流量。
在计算完所有需要向客户端下发的同步属性History之后,还需要计算是否需要同步位置朝向信息,因为这两个数据是不走属性同步的:
bool Entity::writeClientUpdateDataToBundle( Mercury::Bundle & bundle,
const Vector3 & basePos,
EntityCache & cache,
float lodPriority ) const
{
// 省略之前的代码
bool hasAddedReliableRelativePosition = false;
// Not currently enabled as it affects the filters if this is not sent
// regularly.
//if (cache.lastVolatileUpdateNumber() != volatileUpdateNumber_)
{
cache.lastVolatileUpdateNumber( volatileUpdateNumber_ );
if (this->volatileInfo().hasVolatile( lodPriority ))
{
const bool isReliable = hasEventsToSend;
if (cache.isAlwaysDetailed() || (cache.isPrioritised() && CellAppConfig::sendDetailedPlayerVehicles()) )
{
this->writeVolatileDetailedDataToBundle( bundle,
cache.idAlias(), isReliable );
}
else
{
hasAddedReliableRelativePosition =
this->writeVolatileDataToBundle( bundle, basePos,
cache.idAlias(), lodPriority, isReliable );
}
hasSelectedEntity = true;
oldSize = bundle.size();
g_nonVolatileBytes += (oldSize - initSize);
#if ENABLE_WATCHERS
pEntityType_->stats().countVolatileSentToOtherClients(
oldSize - initSize );
#endif
}
}
// 省略后续代码
}
这里的writeVolatileDetailedDataToBundle和writeVolatileDataToBundle都是填充位置和朝向信息的相关代码,这里先不去追究这两个函数的实现细节。在填充完位置和朝向信息之后,才开始来填充属性信息,这里的addEntitySelectMessage只是用来填充当前Entity的id字段,这样客户端收到这个消息之后才知道这些属性和位置朝向更新目标是客户端的哪个Entity:
bool Entity::writeClientUpdateDataToBundle( Mercury::Bundle & bundle,
const Vector3 & basePos,
EntityCache & cache,
float lodPriority ) const
{
// 省略之前的代码
if (hasEventsToSend)
{
if (!hasSelectedEntity)
{
cache.addEntitySelectMessage( bundle );
hasSelectedEntity = true;
}
while (eventIter != eventHistory_.rbegin())
{
eventIter--;
HistoryEvent & event = **eventIter;
if (event.shouldSend( lodPriority, cache.detailLevel() ))
{
if (event.pName())
{
g_totalPublicClientStats.trackEvent( pEntityType_->name(),
event.pName()->c_str(), event.msgLen(),
event.msgStreamSize() );
}
++numEvents;
event.addToBundle( bundle );
}
}
}
cache.lastEventNumber( this->lastEventNumber() );
// 省略后续代码
}
这里每个属性的单次历史变化都会构造一个BaseAppIntInterface::sendMessageToClient消息发送到客户端,注意到这里的msgID是HistoryEvent内部记录的msgID。回顾一下HistoryEvent创建时的字段填充,这个msgID_字段就是当前属性的description.exposedMsgID(),所以当这个消息发送下去之后,客户端会通过ServerConnection::entityProperty来执行属性的回放,具体的exposedMsgID到属性字段的映射机制前文中已经介绍了,因此这里就不提了:
/**
* This method adds this event to the input bundle.
*
* @param bundle The bundle to add the event to.
*/
void HistoryEvent::addToBundle( Mercury::Bundle & bundle )
{
if (pDescription_)
{
pDescription_->stats().countSentToOtherClients( msgLen_ );
}
// Script method calls and property changes
bundle.startMessage(
isReliable_ ?
BaseAppIntInterface::sendMessageToClient :
BaseAppIntInterface::sendMessageToClientUnreliable );
bundle << msgID_;
bundle << (int16) msgStreamSize_;
bundle.addBlob( msg_, msgLen_ );
}
/**
* This method handles an entity property update from the server.
*/
void ServerConnection::entityProperty( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & stream )
{
int exposedPropertyID =
ClientInterface::Range::entityPropertyRange.exposedIDFromMsgID(
header.identifier );
pHandler_->onEntityProperty( selectedEntityID_, exposedPropertyID, stream );
}
当所有要同步的属性历史都打包好了之后,就可以将cache里的最大同步版本号lastEventNumber设置为当前Entity里的属性历史的最大版本号lastEventNumber,意思是任何版本号小于等于这个lastEventNumber的属性历史都不会往当前EntityCache关联的客户端进行发送了。
属性同步的LOD
为了节省属性同步的流量,可以为每个属性设定一个同步距离挡位,当此Entity与当前的客户端对应的RealEntity的距离小于一定值的时候,这个属性才会被同步,这个距离设置就叫做属性LOD。例如50M为第一个挡位,这个时候会同步Name字段, 40M一个挡位,在这个距离内体型相关字段才开始同步, 20M一个挡位,在此范围内头套挂饰等小物件外观才开始同步。
在发送一个HistoryEvent的时候,会先用shouldSend接口来过滤,这里就会比较属性的Lod等级是否小于当前的DetailLevel:
/**
* This method decides whether to send this event to a client based on the
* input priority threshold and detail level.
*/
INLINE bool HistoryEvent::shouldSend( float threshold, int detailLevel ) const
{
return level_.shouldSend( threshold, detailLevel );
}
/**
* This class is a bit of a hack. For state change events, we want to use
* a detail level, while for messages (that is, events with no state
* change), we want to store a priority.
*
*/
class Level
{
public:
Level() {}
Level( int i ) : detail_( i ), isDetail_( true ) {}
Level( float f ) : priority_( f ), isDetail_( false ) {};
bool shouldSend( float threshold, int detailLevel ) const
{
return isDetail_ ?
(detailLevel <= detail_) :
(threshold < priority_);
}
private:
union
{
float priority_;
int detail_;
};
bool isDetail_;
};
这里做shouldSend判定的时候,有两个判定分支:一个是基于优先级的,同步优先级的值小于此设置才可以同步;一个是基于LOD挡位detailLevel的,当前同步的属性LOD小于等于这个detailLevel才可以同步。
在Entity::writeClientUpdateDataToBundle里执行EventHistory下发的时候,会利用EntityCache里存储好的DetailLevel来做shouldSend过滤。打包好当前DetailLevel所需要发送的属性历史之后,会利用当前距离重新计算一下这个EntityCache的最新应该拥有的DetailLevel。如果新的DetailLevel比原来的DetailLevel小,则可能需要将一些新DetailLevel下才可见的属性也同步下去。这里用一个简单的例子来讲一下为什么需要处理DetailLevel的变化。假设Entity(A)上有两个属性P0, P1,其对应的LODLevel分别是0, 1。某个时刻Entity(A)进入到RealEntity(B)的AOI,此时计算出来的DetailLevel为1,因此只发送P1到客户端去创建对应的ClientEntity(A)。当后续P0的值发生改变的时候,由于P0的LodLevel小于当前的DetailLevel,所以这次值改变不会被同步下去。后面在位置移动之后,新的DetailLevel变成了0,此时需要将LodLLevel=0的所有属性的最新值下发下去:
bool Entity::writeClientUpdateDataToBundle( Mercury::Bundle & bundle,
const Vector3 & basePos,
EntityCache & cache,
float lodPriority ) const
{
// 省略之前的代码
hasSelectedEntity |= cache.updateDetailLevel( bundle, lodPriority,
hasSelectedEntity );
// 省略后续代码
}
/**
* This method is used to update the detail level associated with this cache.
* If the detail is increased, the necessary information is added to the input
* bundle that will be sent to the viewing client.
*
* @return True if we added data to the bundle, False otherwise.
*/
bool EntityCache::updateDetailLevel( Mercury::Bundle & bundle,
float lodPriority, bool hasSelectedEntity )
{
bool hasWrittenToStream = false;
// Update the LoD level and add any necessary info to the bundle.
const EntityDescription & entityDesc =
this->pEntity()->pType()->description();
const DataLoDLevels & lodLevels = entityDesc.lodLevels();
while (lodLevels.needsMoreDetail( detailLevel_, lodPriority ))
{
detailLevel_--;
hasWrittenToStream |= this->addChangedProperties( bundle, &bundle,
/* shouldSelectEntity */ !hasSelectedEntity );
hasSelectedEntity |= hasWrittenToStream;
}
while (lodLevels.needsLessDetail( detailLevel_, lodPriority ))
{
this->lodEventNumber( detailLevel_, this->lastEventNumber() );
detailLevel_++;
}
return hasWrittenToStream;
}
由于位置可能出现跳变,所以detailLevel也可能出现跳变,所以这里使用一个循环来遍历中间的所有detailLevel,利用addChangedProperties将每个detailLevel里的属性的最新值计入到bundle里。这里的addChangedProperties函数非常重要,其逻辑分为了两个部分:
- 先统计当前
LOD层级detailLevel_下多少个客户端可见属性的最新版本号大于之前下发版本号lodEventNumber,记录结果为numToSend,并将这个值填入到消息里 - 遍历所有当前
LOD设置下需要同步到客户端的属性,将这个属性的索引和最新值加入到消息里
/**
* This method adds the new property values of the current detail level to the
* input stream.
*
* @param stream The stream to add the changes to.
* @param pBundleForHeader If not NULL, a message is started on this bundle
* if any properties are added.
* @param shouldSelectEntity If true, before a new message is started on the
*
* @return True if we added any property values, false if there were no new
* property values at this level of detail.
*/
bool EntityCache::addChangedProperties( BinaryOStream & stream,
Mercury::Bundle * pBundleForHeader, bool shouldSelectEntity )
{
const EntityDescription & entityDesc =
this->pEntity()->pType()->description();
int numProperties = entityDesc.clientServerPropertyCount();
EventNumber lodEventNumber = this->lodEventNumber( detailLevel_ );
int numToSend = 0;
for (int i = 0; i < numProperties; i++)
{
const DataDescription * pDataDesc =
entityDesc.clientServerProperty( i );
if (pDataDesc->detailLevel() == detailLevel_ &&
this->pEntity()->propertyEventStamps().get( *pDataDesc ) >
lodEventNumber )
{
++numToSend;
}
}
if (numToSend == 0)
{
return false;
}
MF_ASSERT( numToSend < 256 );
if (pBundleForHeader != NULL)
{
if (shouldSelectEntity)
{
this->addEntitySelectMessage( *pBundleForHeader );
}
pBundleForHeader->startMessage( BaseAppIntInterface::updateEntity );
}
stream << uint8( numToSend );
for (int i = 0; i < numProperties; i++)
{
const DataDescription * pDataDesc =
entityDesc.clientServerProperty( i );
if (pDataDesc->detailLevel() == detailLevel_ &&
this->pEntity()->propertyEventStamps().get( *pDataDesc ) >
lodEventNumber )
{
ScriptObject pValue = this->pEntity()->propertyByDataDescription(
pDataDesc );
// 忽略一些容错代码
--numToSend;
stream << uint8(i);
ScriptDataSource source( pValue );
pDataDesc->addToStream( source, stream,
/* isPersistentOnly */ false );
}
}
MF_ASSERT( numToSend == 0 );
return true;
}
从这个函数可以看出,addChangedProperties只能一次处理一个detailLevel,所以updateDetailLevel这里会使用循环来调用这个函数。
在打包好每个DetailLevel可见的属性之后,再将当前EntityCache上记录的每个DetailLevel的数据同步版本更新为当前最新的数据同步版本,这样做的目的是为了节省流量。如果没有记录lodEventNumber这个数组的话,每次DetailLevel变小都会导致新DetailLevel里的所有属性都同步下去,即使相应的属性在第一次同步之后并没有变化。举个例子来说,如果Entity(A)的DetailLevel经历过1->0->1->0的变化,但是属性P0的值在这段时间内都没有变化,那么只需要在第一次1->0的时候向下增量同步P0,第二次1->0的时候由于之前记录的lodEventNumber[0]等于当前属性P0的LastEventNumber,因此就没必要再次下发P0了。