BigWorld 的玩家流程管理
客户端的登录流程
玩家登录的时候客户端会首先连接到LoginApp,执行login请求:
void LoginApp::login( const Mercury::Address & source,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
这个函数超级长,这里就不贴他的逻辑代码了,大概总结一下这个接口的流程:
- 判定当前
LoginApp是否可对外服务,如果不能对外服务,则拒绝登录 - 判断当前客户端的
ip是否是在黑名单内,如果是则拒绝服务 - 根据参数里传入的握手协议版本号与服务器的协议版本号进行比较,如果不等则拒绝登录
- 如果当前客户端已经在登录流程中,则拒绝登录
- 检查登录限流,这个是通过一个配置项来设定一定时间内能登录的玩家数量,如果当前超过这个数量,则拒绝登录
- 如果当前
LoginApp还没有连接到数据库,则拒绝登录 - 如果当前
LoginApp的负载太高,则拒绝登录 - 如果当前
LoginApp配置了Challenge,则会判断是否需要为这个客户端创建一个Challenge,等待客户端正确的完成这个Challenge之后才能继续后续的流程,这个Challenge一般设计为一种解算时非常消耗CPU但验证时非常容易的问题,典型样例是大质数的分解,服务端随机生成一个256字节的整数,让客户端计算出质因子 - 开始解析参数里携带的用户名和密码,检查其长度是否过长,如果过长则拒绝登录。这些数据都是使用
rsa公钥非对称加密的,只有服务器这里有私钥,所以不用担心泄露 - 检查客户端是否提供了对称加密的密钥,如果没有提供且当前设置强制要求连接加密,则拒绝登录
当这一切都判断通过之后,会构造一个发送给数据库服务的请求DBAppInterface::logOn,来验证这个登录,验证结果的处理依赖于这里创建的DatabaseReplyHandler:
INFO_MSG( "LoginApp::login: Logging in user '%s' (%s)\n",
pParams->username().c_str(),
source.c_str() );
// Remember that this attempt is now in progress and discard further
// attempts from that address for some time after it completes.
ClientLoginRequest & loginRequest = loginRequests_[ source ];
loginRequest.reset();
loginRequest.pChannel( pChannel );
loginRequest.pParams( pParams );
DatabaseReplyHandler * pDBHandler =
new DatabaseReplyHandler( *this, source, pChannel,
header.replyID, pParams );
Mercury::Bundle & dbBundle = this->dbAppAlpha().bundle();
dbBundle.startRequest( DBAppInterface::logOn, pDBHandler );
dbBundle << source << *pParams;
this->dbAppAlpha().send();
当DBApp处理logOn请求时,会判断客户端提供的属性定义系统的摘要是否与当前数据库里的属性定义摘要是否匹配,如果不匹配则代表数据库里的数据客户端是无法正确映射的,因此会拒绝登录:
/**
* This method handles a logOn request.
*/
void DBApp::logOn( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
Mercury::Address addrForProxy;
LogOnParamsPtr pParams = new LogOnParams();
data >> addrForProxy >> *pParams;
if (pParams->digest() != this->getEntityDefs().getDigest())
{
ERROR_MSG( "DBApp::logOn: Incorrect digest\n" );
this->sendFailure( header.replyID, srcAddr,
LogOnStatus::LOGIN_REJECTED_BAD_DIGEST,
"Defs digest mismatch." );
return;
}
this->logOn( srcAddr, header.replyID, pParams, addrForProxy );
}
接下来的logOn调用代码没有贴出来的价值,大概说一下流程:
- 如果当前服务器还没有准备好,则拒绝登录
- 如果当前进程负载比较高,则拒绝登录
- 如果集群中有任意的
CellApp负载比较高,则拒绝登录
全都判断通过之后,创建一个LoginHandler来委托后续的处理:
LoginHandler * pHandler =
new LoginHandler( pParams, addrForProxy, srcAddr, replyID );
pHandler->login();
这个pHandler->login则由多个异步查询组成,首先是验证用户名和密码是否匹配,这里会去查询账号数据库:
/**
* Start the login process
*/
void LoginHandler::login()
{
DBApp::instance().pBillingSystem()->getEntityKeyForAccount(
pParams_->username(), pParams_->password(), clientAddr_, *this );
// When getEntityKeyForAccount() completes, onGetEntityKeyForAccount*()
// will be called.
}
根据查询结果,会有四个返回分支,这里我们为了简化,忽略掉新建账号和新建角色这两个分支,只考虑下面的两个回调分支:
onGetEntityKeyForAccountFailure,代表没有用户或者密码不匹配,直接通知回LoginApponGetEntityKeyForAccountSuccess,这个代表用户名密码匹配,这里的返回结果里会带上账号对应角色在数据库里的唯一标识符
这里BigWorld的账号与角色数据是一对一映射的,不像我们在mosaic_game中设计的一个账号可以有多个角色数据,所以这里就没有选择角色这一步,直接在查询回来之后以这个EntityKey来查询角色数据库,获取角色的所有属性数据:
/*
* IGetEntityKeyForAccountHandler override
*/
void LoginHandler::onGetEntityKeyForAccountSuccess( const EntityKey & ekey,
const BW::string & dataForClient,
const BW::string & dataForBaseEntity )
{
dataForClient_ = dataForClient;
dataForBaseEntity_ = dataForBaseEntity;
this->loadEntity( EntityDBKey( ekey ) );
}
/**
* This method loads the entity with the given key.
*/
void LoginHandler::loadEntity( const EntityDBKey & ekey )
{
entityKey_ = ekey;
// Start "create new base" message even though we're not sure entity
// exists. This is to take advantage of getEntity() streaming properties
// into the bundle directly.
pStrmDbID_ = DBApp::prepareCreateEntityBundle( entityKey_.typeID,
entityKey_.dbID, clientAddr_, this, bundle_, pParams_,
&dataForBaseEntity_ );
// Get entity data
pBaseRef_ = &baseRef_;
DBApp::instance().getEntity( ekey, &bundle_, true, *this );
// When getEntity() completes, onGetEntityCompleted() is called.
}
这里的DBApp::prepareCreateEntityBundle负责开始构造一个往BaseAppMgr投递的CreateEntity的请求数据,但是这个数据还并没有构造完全,因为当前角色的数据还没开始加载。当角色数据加载完成之后,会使用checkOutEntity将玩家数据发送到一个BaseApp上,来创建一个Base对象:
/**
* This function checks out the login entity. Must be called after
* entity has been successfully retrieved from the database.
*/
void LoginHandler::checkOutEntity()
{
if ((pBaseRef_ == NULL) &&
DBApp::instance().onStartEntityCheckout( entityKey_ ))
{
// Not checked out and not in the process of being checked out.
DBApp::setBaseRefToLoggingOn( baseRef_, entityKey_.typeID );
pBaseRef_ = &baseRef_;
DBApp::instance().setBaseEntityLocation( entityKey_, baseRef_,
reserveBaseMailboxHandler_ );
// When completes, onReservedBaseMailbox() is called.
}
else // Checked out
{
DBApp::instance().onLogOnLoggedOnUser( entityKey_.typeID,
entityKey_.dbID, pParams_, clientAddr_, replyAddr_, replyID_,
pBaseRef_, dataForClient_, dataForBaseEntity_ );
delete this;
}
}
这里checkOutEntity的else分支处理的是顶号的情况,我们目前可以先忽略。DBApp::instance().setBaseEntityLocation负责在数据库中先增加这个角色到所属BaseApp的映射,即使目前的被分配BaseApp是空的。这个操作结束的时候,reserveBaseMailboxHandler_会被调用,通过onReservedBaseMailbox来触发之前填充好的bundle_的发送:
/**
* This method is called when the record in bigworldLogOns has been created
* or returned.
*/
void LoginHandler::onReservedBaseMailbox( bool isOK, DatabaseID dbID )
{
if (isOK)
{
this->sendCreateEntityMsg();
}
else
{
DBApp::instance().onCompleteEntityCheckout( entityKey_, NULL );
// Something horrible like database disconnected or something.
this->sendFailureReply(
LogOnStatus::LOGIN_REJECTED_DB_GENERAL_FAILURE,
"Unexpected database failure." );
}
}
这里的sendCreateEntityMsg负责最终的createEntity buindle_的send,发送的目标是BaseAppMgr:
/**
* This method sends the BaseAppMgrInterface::createEntity message.
* Assumes bundle has the right data.
*/
inline void LoginHandler::sendCreateEntityMsg()
{
INFO_MSG( "DBApp::logOn: %s\n", pParams_->username().c_str() );
DBApp::instance().baseAppMgr().send( &bundle_ );
}
BaseAppMgr::createEntity里会根据负载均衡来寻找一个负载最低且小于指定阈值的的BaseApp,如果找不到则通知DbApp请求失败,否则调用addEntity接口来转发创建Entity的请求
/**
* This method handles the createEntity message. It is called by DBApp when
* logging in.
*/
void BaseAppMgr::createEntity( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
Mercury::Address baseAppAddr( 0, 0 );
BaseApp * pBest = baseApps_.findLeastLoadedApp();
// 省略一些负载检查的代码
// Copy the client endpoint address
baseAppAddr = pBest->externalAddr();
CreateBaseReplyHandler * pHandler =
new CreateBaseReplyHandler( srcAddr, header.replyID,
baseAppAddr );
// Tell the BaseApp about the client's new proxy
Mercury::Bundle & bundle = pBest->bundle();
bundle.startRequest( BaseAppIntInterface::createBaseWithCellData,
pHandler );
bundle.transfer( data, data.remainingLength() );
pBest->send();
// Update the load estimate.
pBest->addEntity();
}
当BaseApp接收到这个createBaseWithCellData的时候,会直接转发到全局的EntityCreator上:
/**
* This method creates a base entity on this app. It is used to create a client
* proxy or base entities.
*/
void BaseApp::createBaseWithCellData( const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
BinaryIStream & data )
{
pEntityCreator_->createBaseWithCellData( srcAddr, header, data,
pLoginHandler_.get() );
}
EntityCreator::createBaseWithCellData这里根据传入的数据来确定要创建的是不带客户端的Base对象还是带客户端的Proxy对象。这里的Proxy是Base的子类,所以createBaseFromStream的返回值是BasePtr:
/**
* This method creates a base entity on this app. It is used to create a client
* proxy or base entities.
*/
void EntityCreator::createBaseWithCellData( const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
BinaryIStream & data,
LoginHandler * pLoginHandler )
{
// TRACE_MSG( "BaseApp::createBaseWithCellData:\n" );
// The format of the data is as follows:
// EntityID id
// EntityTypeID typeId;
// DatabaseID databaseID;
//
// For proxy:
// clientAddr
//#include "address_load_pair.hpp"
// BASE_DATA
// CELL_DATA (if needed)
// Vector3 position (if needed)
// Vector3 direction (if needed)
Mercury::Address clientAddr = Mercury::Address::NONE;
BW::string encryptionKey;
BasePtr pBase =
this->createBaseFromStream( data, &clientAddr, &encryptionKey );
// Replying with an empty response is considered to be a failure report, so
// any of the early returns from this method will cause this ChannelSender
// to do just that.
Mercury::ChannelSender sender( BaseApp::getChannel( srcAddr ) );
sender.bundle().startReply( header.replyID );
EntityMailBoxRef ref;
ref.init();
if (!pBase)
{
sender.bundle() << ref;
return;
}
// Note: If the reply format changes, check that BaseApp::logOnAttempt is
// okay.
ref = pBase->baseEntityMailBoxRef();
sender.bundle() << ref;
// This is ugly. We should avoid differences in Base and Proxy.
if (pBase->isProxy() && clientAddr != Mercury::Address::NONE)
{
Proxy * pProxy = (Proxy*)pBase.get();
SessionKey loginKey = pProxy->prepareForLogin( clientAddr );
sender.bundle() << loginKey;
if (!encryptionKey.empty())
{
pProxy->encryptionKey( encryptionKey );
}
}
}
创建完Proxy之后,就开始通知请求的发起方BaseAppMgr当前的CreateEntity操作执行完成,返回的数据里首先填入的就是当前Base对象的通信地址baseEntityMailBoxRef。如果创建的是Proxy且 传入的参数中对应的客户端地址不为空,则调用Proxy::prepareForLogin(clientAddr)来获取一个作为登录SessionId的loginKey,并把该loginKey写入reply bundle中。这里创建loginkey的职责会一路转发,并最终执行到PendingLogins::add:
/**
* This method adds a proxy to the set of pending logins. A baseAppLogin
* request is now expected from the client to take it out of this set.
*/
SessionKey PendingLogins::add( Proxy * pProxy,
const Mercury::Address & loginAppAddr )
{
SessionKey loginKey = pProxy->sessionKey();
pProxy->regenerateSessionKey();
// Make sure proxy is only in the pending list once.
// Note: Brute-force but not too much of an issue here.
for (iterator iter = container_.begin(); iter != container_.end(); ++iter)
{
if (iter->second.pProxy() == pProxy)
{
container_.erase( iter );
break;
}
}
container_.insert( Container::value_type( loginKey,
PendingLogin( pProxy, loginAppAddr ) ) );
// Could make this configurable.
const int PENDING_LOGINS_TIMEOUT = 30; // 30 seconds
queue_.push_back( QueueElement(
BaseApp::instance().time() +
PENDING_LOGINS_TIMEOUT * BaseAppConfig::updateHertz(),
pProxy->id(), loginKey ) );
return loginKey;
}
/**
* When a client logs in, we give a different session key to the login key
* used to first connect. This one makes a new one, presumably it should only
* be used soon before the key is sent to the client.
*/
void Proxy::regenerateSessionKey()
{
do
{
// TODO: Not sure why this cannot be 0. If anyone finds out why, write
// a comment!!
sessionKey_ = uint32( timestamp() );
}
while (sessionKey_ == 0);
}
PendingLogins::add生成loginkey的生成规则其实很简单,就是当前的时间戳。Proxy对象创建的时候就会调用这个regenerateSessionKey来初始化,从而保证不同的客户端利用同一个Proxy来登录的时候其唯一标识符是不一样的。创建完loginkey之后,就会将(loginkey, proxy, loginappaddr)这三元组塞入到等待登录的队列中,同时还会加入到一个默认30s超时的队列来执行自动超时清除操作。
当BaseAppMgr接收到这个CreateEntity的应答返回之后,其对应的CreateBaseReplyHandler负责将这个数据再转发到DbApp上的所属LoginHandler去处理,这里解析出所创建的Proxy的通信地址proxyAddr以及对应的baseRef之后, 会通过DBApp::instance().setBaseEntityLocation将当前entityKey_与baseRef的映射建立出来,这样就记录好了当前账号对应的Proxy对象现在在哪一个BaseApp上:
/*
* Mercury::ReplyMessageHandler override.
*/
void LoginHandler::handleMessage( const Mercury::Address & source,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data,
void * arg )
{
Mercury::Address proxyAddr;
data >> proxyAddr;
if (proxyAddr.ip == 0)
{
LogOnStatus::Status status;
switch (proxyAddr.port)
{
case BaseAppMgrInterface::CREATE_ENTITY_ERROR_NO_BASEAPPS:
status = LogOnStatus::LOGIN_REJECTED_NO_BASEAPPS;
break;
case BaseAppMgrInterface::CREATE_ENTITY_ERROR_BASEAPPS_OVERLOADED:
status = LogOnStatus::LOGIN_REJECTED_BASEAPP_OVERLOAD;
break;
default:
status = LogOnStatus::LOGIN_CUSTOM_DEFINED_ERROR;
break;
}
this->handleFailure( &data, status );
}
else
{
data >> baseRef_;
bundle_.clear();
bundle_.startReply( replyID_ );
// Assume success.
bundle_ << (uint8)LogOnStatus::LOGGED_ON;
bundle_ << proxyAddr;
// session key
MF_ASSERT_DEV( data.remainingLength() == sizeof( SessionKey ) );
bundle_.transfer( data, data.remainingLength() );
bundle_ << dataForClient_;
if (entityKey_.dbID != 0)
{
pBaseRef_ = &baseRef_;
DBApp::instance().setBaseEntityLocation( entityKey_,
baseRef_, setBaseMailboxHandler_ );
// When completes, onSetBaseMailbox() is called.
}
else
{
// Must be "createUnknown", and "rememberUnknown" is false.
this->sendReply();
}
}
}
这里的setBaseEntityLocation作用就是将Proxy对应的MailBox数据记录到数据库里,这里的putExplicitID会被设置为false,同时mailbox的地址也会传递进去:
void setBaseEntityLocation( const EntityKey & entityKey,
EntityMailBoxRef & mailbox,
IDatabase::IPutEntityHandler & handler,
UpdateAutoLoad updateAutoLoad = UPDATE_AUTO_LOAD_RETAIN )
{
this->putEntity( entityKey, mailbox.id, NULL, &mailbox,
false, false,
updateAutoLoad,
handler );
}
/**
* This method is meant to be called instead of IDatabase::putEntity() so that
* we can muck around with stuff before passing it to IDatabase.
*/
void DBApp::putEntity( const EntityKey & entityKey,
EntityID entityID,
BinaryIStream * pStream,
EntityMailBoxRef * pBaseMailbox,
bool removeBaseMailbox,
bool putExplicitID,
UpdateAutoLoad updateAutoLoad,
IDatabase::IPutEntityHandler& handler )
{
// Update mailbox for dead BaseApps.
if (this->hasMailboxRemapping() && pBaseMailbox)
{
// Update mailbox for dead BaseApps.
this->remapMailbox( *pBaseMailbox );
}
pDatabase_->putEntity( entityKey, entityID,
pStream, pBaseMailbox, removeBaseMailbox,
putExplicitID, updateAutoLoad, handler );
}
/**
* Override from IDatabase
*/
void MySqlDatabase::putEntity( const EntityKey & entityKey,
EntityID entityID,
BinaryIStream * pStream,
const EntityMailBoxRef * pBaseMailbox,
bool removeBaseMailbox,
bool putExplicitID,
UpdateAutoLoad updateAutoLoad,
IPutEntityHandler & handler )
{
const EntityTypeMapping * pEntityTypeMapping =
entityTypeMappings_[ entityKey.typeID ];
if (pEntityTypeMapping == NULL)
{
ERROR_MSG( "MySqlDatabase::putEntity: Entity with id \'%d\' is invalid."
" Aborting. Please remove from entities.xml or fix def"
" and script of this entity. ", entityKey.typeID );
handler.onPutEntityComplete( false, entityKey.dbID );
return;
}
// Note: gameTime is provided to PutEntityTask via the stream
pBufferedEntityTasks_->addBackgroundTask(
new PutEntityTask( pEntityTypeMapping,
entityKey.dbID, entityID,
pStream, pBaseMailbox, removeBaseMailbox, putExplicitID,
updateAutoLoad, handler ) );
}
这里会一路传递到底层的MySqlDatabase::putEntity方法,在这个方法里会创建一个PutEntityTask任务,然后将这个任务添加到pBufferedEntityTasks_队列里,等待后续的执行。此时构造函数里会将pBaseMailbox赋值给baseMailbox_,同时writeBaseMailbox_会被设置为true,putExplicitID_会被设置为false,updateAutoLoad_会被设置为UPDATE_AUTO_LOAD_RETAIN, writeEntityData_被设置为false:
/**
* Constructor.
*
* Stores all required information so that the task can be executed in a
* separate thread.
*/
PutEntityTask::PutEntityTask( const EntityTypeMapping * pEntityTypeMapping,
DatabaseID databaseID,
EntityID entityID,
BinaryIStream * pStream,
const EntityMailBoxRef * pBaseMailbox,
bool removeBaseMailbox,
bool putExplicitID,
UpdateAutoLoad updateAutoLoad,
IDatabase::IPutEntityHandler & handler,
GameTime * pGameTime ) :
EntityTaskWithID( *pEntityTypeMapping, databaseID, entityID, "PutEntityTask" ),
writeEntityData_( false ),
writeBaseMailbox_( false ),
removeBaseMailbox_( removeBaseMailbox ),
putExplicitID_( putExplicitID ),
updateAutoLoad_( updateAutoLoad ),
handler_( handler ),
pGameTime_( pGameTime )
{
if (pStream != NULL)
{
stream_.transfer( *pStream, pStream->remainingLength() );
writeEntityData_ = true;
}
if (pBaseMailbox)
{
baseMailbox_ = *pBaseMailbox;
writeBaseMailbox_ = true;
}
}
当这个PutEntityTask任务被执行的时候,会调用PutEntityTask::performBackgroundTask方法,在这个方法里会执行到writeBaseMailbox_=true的分支,这里会通过entityTypeMapping_.addLogOnRecord方法将Proxy对应的MailBox数据记录到数据库里:
/**
* This method writes the entity data into the database.
*/
void PutEntityTask::performBackgroundTask( MySql & conn )
{
bool definitelyExists = false;
MF_ASSERT( dbID_ != PENDING_DATABASE_ID );
if (writeEntityData_)
{
// 省略无关代码
}
if (writeBaseMailbox_)
{
// Check for existence to prevent adding invalid LogOn records
if (definitelyExists ||
entityTypeMapping_.checkExists( conn, dbID_ ))
{
// Add or update the log on record.
entityTypeMapping_.addLogOnRecord( conn,
dbID_, baseMailbox_ );
}
}
// 省略一些代码
}
这个EntityTypeMapping::addLogOnRecord其实就是将当前Proxy的MailBox数据记录到数据库里, 表为bigworldLogOns:
namespace
{
const Query addLogOnQuery(
"INSERT INTO bigworldLogOns "
"(databaseID, typeID, objectID, ip, port, salt) "
"VALUES (?,?,?,?,?,?) "
"ON DUPLICATE KEY "
"UPDATE "
"objectID = VALUES(objectID), "
"ip = VALUES(ip), "
"port = VALUES(port), "
"salt = VALUES(salt)" );
}
/**
*
*/
void EntityTypeMapping::addLogOnRecord( MySql & connection,
DatabaseID dbID, const EntityMailBoxRef & mailbox ) const
{
addLogOnQuery.execute( connection,
dbID, this->getDatabaseTypeID(),
mailbox.id,
htonl( mailbox.addr.ip ),
htons( mailbox.addr.port ),
mailbox.addr.salt,
NULL );
}
这个数据库任务执行完成之后,会一路回调,调用到setBaseEntityLocation时设置好的setBaseMailboxHandler_,这里会调用到LoginHandler::onSetBaseMailbox方法,内部将之前构造好的reply发送回LoginApp:
/**
* This method is called when the record in bigworldLogOns has been set.
*/
void LoginHandler::onSetBaseMailbox( bool isOK, DatabaseID dbID )
{
DBApp::instance().onCompleteEntityCheckout( entityKey_,
isOK ? &baseRef_ : NULL );
if (isOK)
{
this->sendReply();
}
else
{
// Something horrible like database disconnected or something.
this->sendFailureReply(
LogOnStatus::LOGIN_REJECTED_DB_GENERAL_FAILURE,
"Unexpected database failure." );
}
}
这个Reply的内容就是Proxy的通信地址以及对应的LoginKey。不过当LoginApp接收到这个返回消息的时候,并不是分开解析这两个字段,而是使用一个LoginReplyRecord来封装一下:
/**
* This structure contains the reply from a successful login.
*/
struct LoginReplyRecord
{
Mercury::Address serverAddr; // send to here
uint32 sessionKey; // use this session key
};
/**
* This method is called when a message comes back from the system.
* It deletes itself at the end.
*/
void DatabaseReplyHandler::handleMessage(
const Mercury::Address & /*source*/,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data,
void * /*arg*/ )
{
uint8 status;
data >> status;
if (status != LogOnStatus::LOGGED_ON)
{
// 忽略许多错误处理
delete this;
return;
}
if (data.remainingLength() < int(sizeof( LoginReplyRecord )))
{
// 忽略一些错误处理
delete this;
return;
}
LoginReplyRecord lrr;
data >> lrr;
BW::string serverMsg;
if (data.remainingLength() > 0)
{
data >> serverMsg;
}
// 省略网络NAT映射的代码
loginApp_.sendAndCacheSuccess( clientAddr_, pChannel_.get(),
replyID_, lrr, serverMsg, pParams_ );
delete this;
}
这里的sendAndCacheSuccess会将当前的登录信息存储在loginRequests_这个map上,key为客户端地址,value为一个ClientLoginRequest。填充好这个ClientLoginRequest之后,调用sendSuccess将登录成功的数据下发到客户端:
/**
* This method sends a reply to a client indicating that logging in has been
* successful. It also caches this information so that it can be resent if
* necessary.
*/
void LoginApp::sendAndCacheSuccess( const Mercury::Address & addr,
Mercury::Channel * pChannel, Mercury::ReplyID replyID,
const LoginReplyRecord & replyRecord,
const BW::string & serverMsg, LogOnParamsPtr pParams )
{
ClientLoginRequest & request = loginRequests_[ addr ];
request.setData( replyRecord, serverMsg );
MF_ASSERT_DEV( *pParams == *request.pParams() );
this->sendSuccess( addr, pChannel, replyID, request );
// Do not let the map get too big. Just check every so often to get rid of
// old caches.
// 省略一些清除过期的loginrequests的代码
}
sendSuccess这里往下发送成功的数据的时候,会使用客户端传递过来的对称加密密钥encryptionKey来进行数据加密,加密之后再调用sendRawReply发送到客户端:
/**
* This method sends a reply to a client indicating that logging in has been
* successful.
*/
void LoginApp::sendSuccess( const Mercury::Address & addr,
Mercury::Channel * pChannel, Mercury::ReplyID replyID,
const ClientLoginRequest & request )
{
MemoryOStream data;
data << (int8)LogOnStatus::LOGGED_ON;
const BW::string & encryptionKey = request.pParams()->encryptionKey();
if (!encryptionKey.empty())
{
// We have to encrypt the reply record because it contains the session
// key
Mercury::EncryptionFilterPtr pFilter =
Mercury::EncryptionFilter::create(
Mercury::SymmetricBlockCipher::create( encryptionKey ) );
MemoryOStream clearText;
request.writeSuccessResultToStream( clearText );
pFilter->encryptStream( clearText, data );
}
else
{
request.writeSuccessResultToStream( data );
}
loginStats_.incSuccesses();
++gNumLogins;
this->sendRawReply( addr, pChannel, replyID, data );
}
当loginApp_.sendAndCacheSuccess执行完之后,LoginApp就会删除当前的DatabaseReplyHandler,至此LoginApp的登录流程彻底结束。
当客户端收到这个登录成功的消息之后,客户端就拿到了对应的Proxy的通信地址以及登录的会话标识符LoginKey,这样客户端就会发送一个baseAppLogin登录请求到这个Proxy:
/**
* This method handles a message from the client. Should be the first message
* received from the client.
*/
void BaseApp::baseAppLogin( const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
const BaseAppExtInterface::baseAppLoginArgs & args )
{
pLoginHandler_->login( extInterface_, srcAddr, header, args );
}
/**
* This method is called by a client to make initial contact with the BaseApp.
* It should be called after the client has logged in via the LoginApp.
*/
void LoginHandler::login( Mercury::NetworkInterface & networkInterface,
const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
const BaseAppExtInterface::baseAppLoginArgs & args )
{
PendingLogins::iterator pendingIter = pPendingLogins_->find( args.key );
if (pendingIter == pPendingLogins_->end())
{
INFO_MSG( "LoginHandler::login(%s): "
"No pending login for loginKey %u. Attempt = %u\n",
srcAddr.c_str(), args.key, args.numAttempts );
// Bad bundle so break out of dispatching the rest.
header.breakBundleLoop();
return;
}
const PendingLogin & pending = pendingIter->second;
SmartPointer<Proxy> pProxy = pending.pProxy();
if (pProxy->isDestroyed())
{
return;
}
if (networkInterface.findChannel( srcAddr ) != NULL)
{
++numLoginCollisions_;
INFO_MSG( "LoginHandler::login(%s): "
"%u collided with an existing channel. Attempt = %u\n",
srcAddr.c_str(), pProxy->id(), args.numAttempts );
return;
}
this->updateStatistics( srcAddr, pending.addrFromLoginApp(), args.numAttempts );
pPendingLogins_->erase( pendingIter );
if (pProxy->attachToClient( srcAddr, header.replyID,
header.pChannel.get() ))
{
INFO_MSG( "LoginHandler::login: "
"%u attached from %s. Attempt %u\n",
pProxy->id(), srcAddr.c_str(), args.numAttempts );
}
}
这里会解析出args.key,也就是我们之前在proxy创建成功之后记录的LoginKey, 通过这个LoginKey来查询所关联的Proxy对象,最终使用attachToClient来绑定客户端的地址到当前Proxy的客户端信道pChannel上:
/**
* This method attaches this proxy to the client at the input address.
* It is only used the first time this client is attached to a proxy on this
* BaseApp, local handoffs are entirely handled by @see giveClientTo.
*
* @param clientAddr The client address.
* @param loginReplyID The reply ID of the login request message.
* @param pChannel If this is a Mercury/TCP request, the associated
* TCP channel, otherwise NULL.
*/
bool Proxy::attachToClient( const Mercury::Address & clientAddr,
Mercury::ReplyID loginReplyID,
Mercury::Channel * pChannel )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
IF_NOT_MF_ASSERT_DEV( pClientChannel_ == NULL )
{
return false;
}
BaseApp & baseApp = BaseApp::instance();
bool hasClient = (clientAddr != Mercury::Address::NONE);
if (hasClient)
{
// Create the BlockCipher to encrypt the channel with.
Mercury::BlockCipherPtr pBlockCipher = NULL;
if (encryptionKey_.empty())
{
ERROR_MSG( "Proxy::attachToClient( %s ): "
"No session encryption key, falling back to unencrypted "
"connection\n",
clientAddr.c_str() );
}
else
{
pBlockCipher =
Mercury::SymmetricBlockCipher::create( encryptionKey_ );
if (!pBlockCipher)
{
ERROR_MSG( "Proxy::attachToClient( %s ): "
"Invalid encryption key, falling back to unencrypted "
"connection\n",
clientAddr.c_str() );
}
}
if (pChannel == NULL)
{
// TCP Channels have already been created before we process
// bundles, UDP channels need to be created explicitly.
pChannel = new Mercury::UDPChannel(
baseApp.extInterface(),
clientAddr,
Mercury::UDPChannel::EXTERNAL,
MIN_CLIENT_INACTIVITY_RESEND_DELAY );
}
if (pBlockCipher)
{
pChannel->setEncryption( pBlockCipher );
}
this->setClientChannel( pChannel );
// 省略一些代码
// now we are ready for the world to know about us.
baseApp.addProxy( this );
// create an object to push ourselves internally
MF_ASSERT( pProxyPusher_ == NULL );
if (!this->hasCellEntity())
{
pProxyPusher_ = new ProxyPusher( this );
}
// Send the login reply before anything else (if required). This must
// be the first message on this channel and must be on a bundle by
// itself. Even though the login message is off-channel (because it has
// to be - we don't know the client's address until we get it), we want
// to send back the reply on the channel because the client has a
// channel for us now and if we send this off-channel the PacketFilters
// won't work. Also - this means that all downstream traffic to the
// client is filtered (i.e. encrypted).
if (loginReplyID != Mercury::REPLY_ID_NONE)
{
// Make a new session key to send with the reply.
// Don't buffer this messages behind createBasePlayer
Mercury::Bundle & bundle = pClientChannel_->bundle();
bundle.startReply( loginReplyID );
bundle << sessionKey_;
this->sendBundleToClient();
}
// Now that the external interface will have the ClientInterface
// registered, we can prime bundles with those messages.
pClientChannel_->bundlePrimer( &clientBundlePrimer_ );
// Don't buffer these messages behind createBasePlayer
Mercury::Bundle & b = pClientChannel_->bundle();
ClientInterface::updateFrequencyNotificationArgs & frequencyArgs =
ClientInterface::updateFrequencyNotificationArgs::start( b );
frequencyArgs.hertz = uint8(BaseAppConfig::updateHertz());
ClientInterface::setGameTimeArgs::start( b ).gameTime = baseApp.time();
// 省略一些代码
}
else
{
INFO_MSG( "Proxy::attachToClient: "
"Channel not created for %u. No client yet.\n", id_ );
}
return true;
}
至此,客户端的登录流程彻底结束,整体流程图参见下图:

客户端现在知道了对应Proxy的通信地址,同时Proxy也根据clientAddr创建了一个UDPChannel并设置到了pChannel_上,两者之间的通信都使用了一个对称加密密钥encryptionKey_来加密。
客户端连接确立之后,整个信道就建立了,消息可以这个信道中按序收发,直到异常状况发生。这些异常情况主要有三类:主动下线、超时掉线和顶号登录,接下来将逐个介绍这三种情况的处理流程。
客户端的下线流程
如果客户端想要通知服务器当前当前角色需要主动下线,那么客户端会发送一个disconnectClient消息到服务器,服务器收到这个消息之后会主动调用Proxy::disconnectClient函数来处理这个请求,此时第一个参数reason被填充为CLIENT_DISCONNECT_CLIENT_REQUESTED:
/**
* This method handles a message from the client telling us that we should
* disconnect it.
*/
void Proxy::disconnectClient(
const BaseAppExtInterface::disconnectClientArgs & args )
{
this->onClientDeath( CLIENT_DISCONNECT_CLIENT_REQUESTED );
}
上面的函数onClientDeath会根据channel查找绑定的Proxy,通知其客户端断线,这里会把逻辑处理委托到logOffClient函数上:
/**
* This gets called when a client dies. Currently, we use it to not send them
* any more packets. It also informs the client's relatives (the Cell), which
* grieves for a few microseconds then removes it.
*/
void Proxy::onClientDeath( ClientDisconnectReason reason,
bool shouldExpectClient /* = true */ )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
TRACE_MSG( "Proxy::onClientDeath: Client id %u has disconnected (%s)\n",
this->id(), clientDisconnectReasonToString( reason ) );
// we can be called again if we are already dead if we try to send more
// stuff to the client ... for now we just ignore it but could do no more...
if (!this->hasClient() && shouldExpectClient)
{
return; // already dead and told the cell about it
}
if (shouldExpectClient || this->hasClient())
{
// If we don't expect a client, we don't care about finalising
// acks from the channel
bool shouldCondemnClient = shouldExpectClient;
if (reason == CLIENT_DISCONNECT_TIMEOUT ||
reason == CLIENT_DISCONNECT_RATE_LIMITS_EXCEEDED )
{
// We don't care about finalising acks from a timed-out channel
shouldCondemnClient = false;
}
this->logOffClient( shouldCondemnClient );
}
// OK, we know the cell and we haven't told it about it yet. Call away
PyObject * pFunc = PyObject_GetAttrString( this, "onClientDeath" );
// 这里会通知脚本对象执行onClientDeath回调
}
logOffClient需要一个参数shouldCondemnClient,表示在断开客户端连接时是否应该保留该底层通道,而不是立即销毁它,在主动下线时这个参数为true:
/**
* Send the client a disconnect message, then disconnect.
*
* @param shouldCondemnChannel True if any client channel should be condemned,
* false if it should be immediately destroyed.
* Generally true unless it was a client time-out
* or other situation where the client channel is
* probably no longer in a good state.
*/
void Proxy::logOffClient( bool shouldCondemnChannel )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
// Abort any pending downloads, and then tell the script about it after
// the client has gone.
DownloadCallbacks callbacks;
dataDownloads_.abortDownloads( callbacks );
if (this->isClientConnected())
{
// Send a message to the client telling it that it has been logged off.
// The reason parameter is not yet used.
Mercury::Bundle & bundle = pClientChannel_->bundle();
ClientInterface::loggedOffArgs::start( bundle ).reason = 0;
this->sendBundleToClient();
}
this->detachFromClient( shouldCondemnChannel );
callbacks.triggerCallbacks( this );
}
这个shouldCondemnChannel参数会顺带的传递到detachFromClient接口中,来做真正的断线处理:
- 如果为
true:会调用channel->shutDown(),这里会保留通道让它继续发送/接收剩余的确认(ACKs)、完成待发的可靠数据,然后再关闭(优雅断开,允许尾包/重传完成)。 - 如果为
false:会调用channel->destroy(),立即销毁通道,不再等ACK和重传(适用于超时、被速率限制或通道状态不可用的情况)。
/**
* Finalise a session with the currently attached client.
*
* @param shouldCondemn If true, the client channel is condemned (and so
* will continue to send acknowledgements for some
* time), otherwise, the channel is reset.
*/
void Proxy::detachFromClient( bool shouldCondemn )
{
isGivingClientAway_ = false;
if (this->hasClient())
{
BaseApp::instance().removeProxy( this );
}
if (pClientChannel_ != NULL)
{
// Put aside the pointer which setClientChannel
// uses and then sets to NULL, so we can clean it up.
pClientChannel_->pChannelListener( NULL );
Mercury::ChannelPtr pSavedClientChannel = pClientChannel_;
this->setClientChannel( NULL );
if (pSavedClientChannel->isConnected())
{
if (shouldCondemn)
{
pSavedClientChannel->shutDown();
}
else
{
pSavedClientChannel->destroy();
}
}
}
// Don't try to disable the witness if we've already sent the
// destroyCell message.
if (cellHasWitness_ && this->shouldSendToCell())
{
this->sendEnableDisableWitness( /*enable:*/false );
}
cellHasWitness_ = false;
pBufferedClientBundle_.reset();
// 省略很多代码
}
原来的pClientChannel_会被设置为nullptr,同时如果在CellApp上有了对应的RealEntity,则通过sendEnableDisableWitness(false)来通知这个RealEntity停止向客户端发送任何消息。
在Proxy::onClientDeath调用完logOffClient后,Proxy的Python脚本那边会调用onClientDeath回调,来通知脚本对象客户端断开连接,此时脚本层可以调用self.destroy()来调用Base::py_destroy来销毁Proxy对象:
PY_KEYWORD_METHOD_DECLARE( py_destroy )
PY_METHOD( destroy )
/**
* This method destroys this object when the script says so.
*/
PyObject * Base::py_destroy( PyObject * args, PyObject * kwargs )
{
if (PyTuple_Size( args ) != 0)
{
PyErr_SetString( PyExc_TypeError, "Only expecting keyword arguments" );
return NULL;
}
if (this->hasCellEntity() || this->isGetCellPending())
{
PyErr_SetString( PyExc_ValueError,
"Still has cell entity. Use Base.destroyCellEntity" );
return NULL;
}
if (isDestroyed_)
{
PyErr_SetString( PyExc_ValueError, "Base entity already destroyed" );
return NULL;
}
static char * keywords[] =
{
const_cast< char *> ( "deleteFromDB" ),
const_cast< char *> ( "writeToDB" ),
NULL
};
PyObject * pDeleteFromDB = NULL;
PyObject * pWriteToDB = NULL;
if (!PyArg_ParseTupleAndKeywords( args, kwargs, "|OO", keywords,
&pDeleteFromDB, &pWriteToDB ))
{
return NULL;
}
bool deleteFromDB = (pDeleteFromDB != NULL) ?
PyObject_IsTrue( pDeleteFromDB ) : false;
bool writeToDB = (pWriteToDB != NULL) ?
PyObject_IsTrue( pWriteToDB ) : this->hasWrittenToDB();
if (pWriteToDB && !writeToDB && BaseApp::instance().pSqliteDB())
{
// Writes lost due to flip-floping
SECONDARYDB_WARNING_MSG( "Base::py_destroy: %s %d destroyed with "
"writeToDB=False. All writes to the secondary "
"database will be lost.\n", pType_->name(), id_ );
}
this->destroy( deleteFromDB, writeToDB );
Py_RETURN_NONE;
}
当Proxy对象上的destory接口被调用时,参数logOffFromDB参数默认为true,此时会设置WriteDBFlags中的WRITE_LOG_OFF标志。
接下来调用Base::writeToDB(flags),其中flags包含WRITE_LOG_OFF标志:
void destroy( bool deleteFromDB, bool writeToDB, bool logOffFromDB = true );
/**
* This method destroys this base.
*/
void Base::destroy( bool deleteFromDB, bool writeToDB, bool logOffFromDB )
{
IF_NOT_MF_ASSERT_DEV( !isDestroyed_ )
{
return;
}
if (inDestroy_)
{
return;
}
inDestroy_ = true;
Script::call( PyObject_GetAttrString( this, "onDestroy" ),
PyTuple_New( 0 ), "onDestroy", true );
// TRACE_MSG( "Base(%d)::destroy: deleteFromDB=%d, writeToDB=%d\n",
// id_, deleteFromDB, writeToDB );
keepAliveTimerHandle_.cancel();
// Inform our backup that we've been destroyed.
const Mercury::Address backupAddr =
BaseApp::instance().backupAddrFor( this->id() );
if (backupAddr != Mercury::Address::NONE)
{
Mercury::ChannelSender sender( BaseApp::getChannel( backupAddr ) );
BaseAppIntInterface::stopBaseEntityBackupArgs::start(
sender.bundle() ).entityID = this->id();
}
if (this->hasWrittenToDB())
{
WriteDBFlags flags = 0;
if (logOffFromDB)
{
flags |= WRITE_LOG_OFF;
}
if (deleteFromDB)
{
flags |= WRITE_DELETE_FROM_DB;
}
else if (writeToDB)
{
flags |= WRITE_BASE_CELL_DATA;
}
this->writeToDB( flags );
}
BaseApp::instance().pGlobalBases()->onBaseDestroyed( this );
this->discard();
inDestroy_ = false;
}
这个writeToDB接口会根据flags来判断是否需要写入数据库。如果flags中包含WRITE_LOG_OFF标志,则此时存库消息的回调会被设置为LogOffReplyHandler:
/**
* This method writes the entity to the database.
*/
bool Base::writeToDB( WriteDBFlags flags, WriteToDBReplyStructPtr pReplyStruct,
PyObjectPtr pCellData, DatabaseID explicitDatabaseID )
{
// 省略很多代码
bool isLogOff =(flags & WRITE_LOG_OFF);
if (!shouldWriteToSecondary || isLogOff)
{
BaseApp & baseApp = BaseApp::instance();
DatabaseID dbID = (flags & WRITE_EXPLICIT_DBID) ?
explicitDatabaseID : databaseID_;
if (dbID == PENDING_DATABASE_ID)
{
DEBUG_MSG( "Base::writeToDB: %s %d is still pending initial "
"database write.\n",
this->pType()->name(), id_ );
// Set the dbID to 0, which will distribute this operation to
// the Alpha DBApp.
dbID = 0;
}
const DBAppGateway & dbApp = baseApp.dbAppGatewayFor( dbID );
if (dbApp.address().isNone())
{
ERROR_MSG( "Base::writeToDB: No DBApp is available, "
"data for %" PRI64 " has been lost\n", dbID );
return false;
}
Mercury::Channel & channel = baseApp.getChannel( dbApp.address() );
std::auto_ptr< Mercury::Bundle > pBundle( channel.newBundle() );
BinaryOStream * pStream = pBundle.get();
bool shouldSetToPending = false;
// We expect a reply if we are getting a database id. That is, this is
// the first time that we are being written to the database.
if (pReplyStruct->expectsReply() ||
(!isLogOff && !this->hasWrittenToDB()))
{
shouldSetToPending = (databaseID_ == 0);
pBundle->startRequest( DBAppInterface::writeEntity,
new WriteEntityReplyHandler( this, pReplyStruct ) );
}
else if (isLogOff && this->hasWrittenToDB())
{
// Owned by LogOffReplyHandler
MemoryOStream * pMemoryStream = new MemoryOStream;
pStream = pMemoryStream;
pBundle->startRequest( DBAppInterface::writeEntity,
new LogOffReplyHandler( this->id(), pMemoryStream ) );
}
else
{
pBundle->startMessage( DBAppInterface::writeEntity );
}
if (flags & WRITE_EXPLICIT_DBID)
{
MF_ASSERT_DEV( (flags & WRITE_EXPLICIT_DBID) && (databaseID_ == 0) );
MF_ASSERT_DEV( (flags & WRITE_EXPLICIT_DBID) && (explicitDatabaseID != 0) );
}
*pStream << flags << this->pType()->id() << dbID << this->id();
if (shouldSetToPending)
{
// This needs to be done after databaseID_ is streamed on.
databaseID_ = PENDING_DATABASE_ID;
}
if (!this->addToStream( flags, *pStream, pCellData ))
{
return false;
}
if (flags & WRITE_BASE_CELL_DATA)
{
*pStream << BaseApp::instance().time();
}
// If the stream is not already on the bundle, place it on now.
if (pStream != pBundle.get())
{
pBundle->addBlob(
static_cast< MemoryOStream * >( pStream )->data(),
static_cast< MemoryOStream * >( pStream )->size() );
}
persistentSize = pBundle->size();
channel.send( pBundle.get() );
}
}
DBApp::writeEntity处理这个writeEntity请求时,会继续将这个flags传递到WriteEntityHandler里,这个Handler负责处理当前writeEntity请求的具体执行:
/**
* This method handles the writeEntity mercury message.
*/
void DBApp::writeEntity( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
AUTO_SCOPED_PROFILE( "writeEntity" );
WriteDBFlags flags;
data >> flags;
// if this fails then the calling component had no need to call us
MF_ASSERT( flags &
(WRITE_BASE_CELL_DATA | WRITE_LOG_OFF | WRITE_AUTO_LOAD_MASK) );
EntityDBKey ekey( 0, 0 );
data >> ekey.typeID >> ekey.dbID;
// TRACE_MSG( "DBApp::writeEntity: %lld flags=%i\n",
// ekey.dbID, flags );
bool isOkay = this->getEntityDefs().isValidEntityType( ekey.typeID );
if (!isOkay)
{
ERROR_MSG( "DBApp::writeEntity: Invalid entity type %d\n",
ekey.typeID );
if (header.replyID != Mercury::REPLY_ID_NONE)
{
Mercury::ChannelSender sender( DBApp::getChannel( srcAddr ) );
sender.bundle().startReply( header.replyID );
sender.bundle() << isOkay << ekey.dbID;
}
}
else
{
EntityID entityID;
data >> entityID;
WriteEntityHandler* pHandler =
new WriteEntityHandler( ekey, entityID, flags,
header.replyID, srcAddr );
if (flags & WRITE_DELETE_FROM_DB)
{
pHandler->deleteEntity();
}
else
{
pHandler->writeEntity( data, entityID );
}
}
}
WriteEntityHandler::writeEntity执行存库操作的时候,如果发现flags中包含WRITE_LOG_OFF标志,则调用putEntity方法时的第四个参数removeBaseMailbox为true,表示需要删除该实体的BaseMailbox对象:
/**
* This method writes the entity data into the database.
*
* @param data Stream should be currently at the start of the entity's
* data.
* @param entityID The entity's base mailbox object ID.
*/
void WriteEntityHandler::writeEntity( BinaryIStream & data, EntityID entityID )
{
BinaryIStream * pStream = NULL;
UpdateAutoLoad updateAutoLoad =
(flags_ & WRITE_AUTO_LOAD_YES) ? UPDATE_AUTO_LOAD_TRUE :
(flags_ & WRITE_AUTO_LOAD_NO) ? UPDATE_AUTO_LOAD_FALSE:
UPDATE_AUTO_LOAD_RETAIN;
if (flags_ & WRITE_BASE_CELL_DATA)
{
pStream = &data;
}
if (flags_ & WRITE_LOG_OFF)
{
this->putEntity( pStream, updateAutoLoad,
/* pBaseMailbox: */ NULL,
/* removeBaseMailbox: */ true );
}
else if (ekey_.dbID == 0 ||(flags_ & WRITE_EXPLICIT_DBID))
{
// New entity is checked out straight away
baseRef_.init( entityID, srcAddr_, EntityMailBoxRef::BASE,
ekey_.typeID );
this->putEntity( pStream, updateAutoLoad, &baseRef_ );
}
else
{
this->putEntity( pStream, updateAutoLoad );
}
// When putEntity() completes onPutEntityComplete() is called.
}
/**
* This method is invoked by WriteEntityHandler::writeEntity to pass through
* a putEntity request to the database implementation.
*/
void WriteEntityHandler::putEntity( BinaryIStream * pStream,
UpdateAutoLoad updateAutoLoad,
EntityMailBoxRef * pBaseMailbox,
bool removeBaseMailbox )
{
DBApp::instance().putEntity( ekey_, entityID_,
pStream, pBaseMailbox, removeBaseMailbox,
flags_ & WRITE_EXPLICIT_DBID,
updateAutoLoad, *this );
}
/**
* This method is meant to be called instead of IDatabase::putEntity() so that
* we can muck around with stuff before passing it to IDatabase.
*/
void DBApp::putEntity( const EntityKey & entityKey,
EntityID entityID,
BinaryIStream * pStream,
EntityMailBoxRef * pBaseMailbox,
bool removeBaseMailbox,
bool putExplicitID,
UpdateAutoLoad updateAutoLoad,
IDatabase::IPutEntityHandler& handler )
{
// Update mailbox for dead BaseApps.
if (this->hasMailboxRemapping() && pBaseMailbox)
{
// Update mailbox for dead BaseApps.
this->remapMailbox( *pBaseMailbox );
}
pDatabase_->putEntity( entityKey, entityID,
pStream, pBaseMailbox, removeBaseMailbox,
putExplicitID, updateAutoLoad, handler );
}
/**
* Override from IDatabase
*/
void MySqlDatabase::putEntity( const EntityKey & entityKey,
EntityID entityID,
BinaryIStream * pStream,
const EntityMailBoxRef * pBaseMailbox,
bool removeBaseMailbox,
bool putExplicitID,
UpdateAutoLoad updateAutoLoad,
IPutEntityHandler & handler )
{
const EntityTypeMapping * pEntityTypeMapping =
entityTypeMappings_[ entityKey.typeID ];
if (pEntityTypeMapping == NULL)
{
ERROR_MSG( "MySqlDatabase::putEntity: Entity with id \'%d\' is invalid."
" Aborting. Please remove from entities.xml or fix def"
" and script of this entity. ", entityKey.typeID );
handler.onPutEntityComplete( false, entityKey.dbID );
return;
}
// Note: gameTime is provided to PutEntityTask via the stream
pBufferedEntityTasks_->addBackgroundTask(
new PutEntityTask( pEntityTypeMapping,
entityKey.dbID, entityID,
pStream, pBaseMailbox, removeBaseMailbox, putExplicitID,
updateAutoLoad, handler ) );
}
这个PutEntityTask在执行任务的时候,如果发现removeBaseMailbox为true,则会执行entityTypeMapping_.removeLogOnRecord方法来从bigworldLogOns表里删除该实体的BaseMailbox记录,刚好对应了登录时的setBaseEntityLocation所添加的记录:
else if (removeBaseMailbox_)
{
entityTypeMapping_.removeLogOnRecord( conn, dbID_ );
}
namespace
{
const Query removeLogOnQuery(
"DELETE FROM bigworldLogOns WHERE databaseID = ? AND typeID = ?" );
}
void EntityTypeMapping::removeLogOnRecord( MySql & conn, DatabaseID id ) const
{
removeLogOnQuery.execute( conn, id, this->getDatabaseTypeID(), NULL );
}
客户端的掉线流程
BigWorld中客户端连接使用的是无连接的UDP而不是有连接的TCP,所以并没有断线这一说。但是可靠UDP在逻辑层模拟了一个可靠连接,当发出的数据长时间没有收到ACK回复的时候,这个通道就会因为超时而被设置为不可用,执行销毁操作:
/**
* This method "destroys" this channel. It should be considered similar to
* delete pChannel except that there may be other references remaining.
*/
void Channel::destroy()
{
IF_NOT_MF_ASSERT_DEV( !isDestroyed_ )
{
return;
}
inactivityTimerHandle_.cancel();
this->doDestroy();
isDestroyed_ = true;
pNetworkInterface_->onChannelGone( this );
if (this->pChannelListener())
{
this->pChannelListener()->onChannelGone( *this );
}
this->decRef();
}
销毁时会通知绑定在其身上的channelListener来通知onChannelGone。而在Proxy绑定到一个客户端的Channel的时候, 会执行BaseApp:addProxy,这里会将当前的BaseApp注册为这个通道的ChannelListener:
/**
* This method adds a proxy from this manager.
*/
void BaseApp::addProxy( Proxy * pNewProxy )
{
Mercury::ChannelPtr pChannel = pNewProxy->pClientChannel();
Mercury::Address address = pChannel->addr();
address.salt = (pChannel->isTCP() ? 1 : 0);
TRACE_MSG( "BaseApp: Adding proxy %u at %s\n",
pNewProxy->id(), pChannel->c_str() );
// set ourselves in the map from ip address to proxy
Proxy *& rpProxy = proxies_[address];
// 省略一些代码
pChannel->pChannelListener( this );
}
所以当这个客户端通道状态变成超时断线时,BaseApp::onChannelGone就会被执行:
/*
* Override from Mercury::ChannelListener.
*/
void BaseApp::onChannelGone( Mercury::Channel & channel )
{
TRACE_MSG( "BaseApp::onChannelGone: %s\n", channel.c_str() );
Proxies::iterator iProxy = proxies_.find( channel.addr() );
if (iProxy != proxies_.end())
{
ProxyPtr pProxy = iProxy->second;
pProxy->onClientDeath( CLIENT_DISCONNECT_TIMEOUT );
}
}
上面的函数会根据channel查找绑定的Proxy,使用onClientDeath通知其客户端断线。这个onClientDeath已经在前面的小节里介绍过了,与之前的主动下线的差别是这里的shouldCondemnClient参数被设置为false,所以这里会直接销毁Channel,而不是等待ACK:
if (shouldExpectClient || this->hasClient())
{
// If we don't expect a client, we don't care about finalising
// acks from the channel
bool shouldCondemnClient = shouldExpectClient;
if (reason == CLIENT_DISCONNECT_TIMEOUT ||
reason == CLIENT_DISCONNECT_RATE_LIMITS_EXCEEDED )
{
// We don't care about finalising acks from a timed-out channel
shouldCondemnClient = false;
}
this->logOffClient( shouldCondemnClient );
}
客户端的顶号流程
从上述流程代码可以看出,一旦客户端连接超时,则Proxy会通过detachFromClient将此Channel直接销毁,抛弃当前未ACK以及后续的所有发往客户端的消息。所以这里并没有mosaic_game中的断线重连机制,只有断线。那客户端怎么重新连接到服务器呢,答案是走重新登录顶号机制。
重新登录时,客户端发送的数据与常规登录没有任何区别,所以顶号流程与常规登录流程前期的执行是一摸一样的,只有在DbApp查询完这个Entity的数据的时候,会带上之前绑定的Proxy地址一起返回:
/**
* DBApp::GetEntityHandler override
*/
void LoginHandler::onGetEntityCompleted( bool isOK,
const EntityDBKey & entityKey,
const EntityMailBoxRef * pBaseEntityLocation )
{
// 忽略一些错误处理代码
entityKey_ = entityKey;
if (pBaseEntityLocation != NULL)
{
baseRef_ = *pBaseEntityLocation;
pBaseRef_ = &baseRef_;
}
else
{
pBaseRef_ = NULL;
}
if (pStrmDbID_)
{
// Means ekey.dbID was 0 when we called prepareCreateEntityBundle()
// Now fix up everything.
*pStrmDbID_ = entityKey.dbID;
}
this->checkOutEntity();
}
这个Proxy的地址就存储在pBaseEntityLocation里,当这个指针不是nullptr的时候,代表这个Entity对应的Proxy已经创建好了,baseRef就会被赋值为对应的Proxy的地址,随后在checkOutEntity的时候根据baseRef里是否有值来执行不同的操作:
/**
* This function checks out the login entity. Must be called after
* entity has been successfully retrieved from the database.
*/
void LoginHandler::checkOutEntity()
{
if ((pBaseRef_ == NULL) &&
DBApp::instance().onStartEntityCheckout( entityKey_ ))
{
// 之前通知baseappmgr创建proxy的逻辑 这里先忽略
}
else // Checked out
{
DBApp::instance().onLogOnLoggedOnUser( entityKey_.typeID,
entityKey_.dbID, pParams_, clientAddr_, replyAddr_, replyID_,
pBaseRef_, dataForClient_, dataForBaseEntity_ );
delete this;
}
}
我们重点关注的是这里的else分支,即尝试登录到已登录对象onLogOnLoggedOnUser:
/**
* This method is called when there is a log on request for an entity that is
* already logged on.
*/
void DBApp::onLogOnLoggedOnUser( EntityTypeID typeID, DatabaseID dbID,
LogOnParamsPtr pParams,
const Mercury::Address & clientAddr, const Mercury::Address & replyAddr,
Mercury::ReplyID replyID, const EntityMailBoxRef * pExistingBase,
const BW::string & dataForClient, const BW::string & dataForBaseEntity )
{
// 先忽略一些容错代码
INFO_MSG( "DBApp::onLogOnLoggedOnUser: name = %s. databaseID = "
"%" FMT_DBID ". typeID = %d. entityID = %d. BaseApp = %s\n",
pParams->username().c_str(),
dbID,
typeID,
pExistingBase->id,
pExistingBase->addr.c_str() );
// Log on to existing base
Mercury::ChannelSender sender(
DBApp::getChannel( pExistingBase->addr ) );
Mercury::Bundle & bundle = sender.bundle();
bundle.startRequest( BaseAppIntInterface::logOnAttempt,
new RelogonAttemptHandler( pExistingBase->type(), dbID,
replyAddr, replyID, pParams, clientAddr, dataForClient ) );
bundle << pExistingBase->id;
bundle << clientAddr;
bundle << pParams->encryptionKey();
bundle << dataForBaseEntity;
}
onLogOnLoggedOnUser在处理完一些错误情况之后,会将新客户端的信息打包到logOnAttempt这个请求中,并发送到现有的Proxy的通信地址pExistingBase->addr 上,然后将RelogonAttemptHandler注册为这个RPC的回包函数。
当BaseApp接收到这个RPC之后,先通过id找到对应的Proxy,然后调用脚本层的回调onLogOnAttempt来判定是否接受这个顶号操作,根据接受的结果来决定后续的顶号流程:
/**
* This method handles a message from the database telling us that a player is
* trying to log on to an active entity.
*/
void BaseApp::logOnAttempt( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
MF_ASSERT( srcAddr == this->dbApp().addr() );
EntityID id;
Mercury::Address clientAddr;
data >> id >> clientAddr;
Base * pBase = bases_.findEntity( id );
if (pBase == NULL)
{
WARNING_MSG( "BaseApp::logOnAttempt: No base %u\n", id );
Mercury::ChannelSender sender( this->dbApp().channel() );
sender.bundle().startReply( header.replyID );
sender.bundle() << BaseAppIntInterface::LOG_ON_ATTEMPT_WAIT_FOR_DESTROY;
return;
}
// We never expect this to happen, but reject the login just in case
IF_NOT_MF_ASSERT_DEV( pBase->isProxy() )
{
ERROR_MSG( "BaseApp::logOnAttempt:"
"%u is not a proxy, rejecting login attempt.\n",
id );
Mercury::ChannelSender sender( this->dbApp().channel() );
sender.bundle().startReply( header.replyID );
sender.bundle() << BaseAppIntInterface::LOG_ON_ATTEMPT_REJECTED;
return;
}
Proxy * pProxy = static_cast< Proxy * >( pBase );
PyObject * pResult;
BW::string encryptionKey;
data >> encryptionKey;
BW::string logOnData;
data >> logOnData;
bool tookControl;
PyObject* pFunction = PyObject_GetAttrString( pBase, "onLogOnAttempt" );
if (pFunction)
{
// 暂时省略脚本层处理顶号判定的代码
}
else
{
NOTICE_MSG( "BaseApp::logOnAttempt: "
"Rejecting relogon attempt for entity %u. "
"No script method %s.onLogOnAttempt\n",
id, pBase->pType()->name() );
PyErr_Clear();
tookControl = false;
}
// 判定了是否能被顶号的后处理
}
这里脚本层onLogOnAttempt的处理主要是处理一些奇怪的边界条件,例如脚本调用trace,以及当前entity正在被销毁。判定的结果会存储在tookControl这个变量里,代表是否允许顶号, 然后再根据这个变量来做后续的逻辑:
if (tookControl)
{
if (!clientAddr.ip)
{
// only clear base's client channel if this is not a web login
// (check clientAddr.ip)
INFO_MSG( "BaseApp::logOnAttempt: "
"For %u from web login.\n", pBase->id() );
}
else
{
// 省略一些错误检查
pProxy->logOffClient( /* shouldCondemnChannel */ true );
}
pProxy->completeReLogOnAttempt( clientAddr, header.replyID,
encryptionKey );
}
else
{
NOTICE_MSG( "BaseApp::logOnAttempt: "
"Rejecting relogin attempt. " \
"Have not taken control.\n" );
Mercury::ChannelSender sender( this->dbApp().channel() );
Mercury::Bundle & bundle = sender.bundle();
bundle.startReply( header.replyID );
bundle << BaseAppIntInterface::LOG_ON_ATTEMPT_REJECTED;
}
这里很神奇的就是,如果允许被顶号,则会立即执行logoffClient来关掉老的客户端连接。如果老的客户端连接存在,则在关闭之前会发送一个ClientInterface::loggedOffArgs的消息到客户端,以通知其下线理由为被顶号:
/**
* Send the client a disconnect message, then disconnect.
*
* @param shouldCondemnChannel True if any client channel should be condemned,
* false if it should be immediately destroyed.
* Generally true unless it was a client time-out
* or other situation where the client channel is
* probably no longer in a good state.
*/
void Proxy::logOffClient( bool shouldCondemnChannel )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
// Abort any pending downloads, and then tell the script about it after
// the client has gone.
DownloadCallbacks callbacks;
dataDownloads_.abortDownloads( callbacks );
if (this->isClientConnected())
{
// Send a message to the client telling it that it has been logged off.
// The reason parameter is not yet used.
Mercury::Bundle & bundle = pClientChannel_->bundle();
ClientInterface::loggedOffArgs::start( bundle ).reason = 0;
this->sendBundleToClient();
}
this->detachFromClient( shouldCondemnChannel );
callbacks.triggerCallbacks( this );
}
关闭客户端连接之后,再执行detachFromClient来清理掉之前的连接信息,注意这里的shouldCondemnChannel是true,所以这个通道会存在一个备份之中,等待所有数据下发完成,而不是之前超时的时候直接强制关闭。
detachFromClient之后通过completeReLogOnAttempt来通知DbApp顶号已完成了老连接的销毁,这里会通过prepareForLogin生成一个新的LoginKey,同时将这个(LoginKey,Proxy)添加到PendingLogin这个Map里:
/**
* This method completes a re-log-on attempt.
*
* @param clientAddress The client address that is now expected to log on.
* @param replyID The reply ID for the request for onLogOnAttempt.
* @param encryptionKey The encryption key to be used.
*/
void Proxy::completeReLogOnAttempt( const Mercury::Address & clientAddress,
Mercury::ReplyID replyID,
const BW::string & encryptionKey )
{
INFO_MSG( "Proxy::completeReLogOnAttempt( %s %d ): "
"Waiting to accept re-log-on attempt from %s\n",
this->pType()->name(),
id_,
clientAddress.c_str() );
BaseApp & baseApp = BaseApp::instance();
SessionKey loginKey = 0;
if (clientAddress.ip)
{
loginKey = this->prepareForLogin( clientAddress );
}
this->encryptionKey( encryptionKey );
Mercury::ChannelSender sender( baseApp.dbApp().channel() );
Mercury::Bundle & bundle = sender.bundle();
bundle.startReply( replyID );
bundle << BaseAppIntInterface::LOG_ON_ATTEMPT_TOOK_CONTROL;
// This needs to match what the BaseAppMgr sends back to the
// database.
bundle << baseApp.extInterface().address();
bundle << this->baseEntityMailBoxRef();
bundle << loginKey;
pPendingReLogOn_.reset( NULL );
}
当DbApp收到这个回包之后,之前创建的RelogonAttemptHandler就会被执行,这里会通知LoginApp当前登录成功,并附上现有Proxy的地址以及新的LoginKey:
/*
* Mercury::ReplyMessageHandler override.
*/
void RelogonAttemptHandler::handleMessage(
const Mercury::Address & source,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data,
void * arg )
{
uint8 result;
data >> result;
if (hasAborted_)
{
DEBUG_MSG( "RelogonAttemptHandler: DBID %" FMT_DBID ": "
"Ignoring BaseApp reply, re-logon attempt has been aborted.\n",
ekey_.dbID );
// Delete ourselves as we have been aborted.
delete this;
return;
}
switch (result)
{
case BaseAppIntInterface::LOG_ON_ATTEMPT_TOOK_CONTROL:
{
INFO_MSG( "RelogonAttemptHandler: DBID %" FMT_DBID ": "
"It's taken over.\n",
ekey_.dbID );
Mercury::Address proxyAddr;
data >> proxyAddr;
EntityMailBoxRef baseRef;
data >> baseRef;
replyBundle_.startReply( replyID_ );
// Assume success.
replyBundle_ << (uint8)LogOnStatus::LOGGED_ON;
replyBundle_ << proxyAddr;
replyBundle_.transfer( data, data.remainingLength() );
replyBundle_ << dataForClient_;
DBApp::instance().interface().sendOnExistingChannel( replyAddr_,
replyBundle_ );
delete this;
break;
}
// 省略其他错误情况
default:
CRITICAL_MSG( "RelogonAttemptHandler: DBID %" FMT_DBID ": "
"Invalid result %d\n",
ekey_.dbID,
int(result) );
delete this;
break;
}
}
对于LoginApp来说,顶号成功与登录成功的处理是一样的,所以这里不再介绍相关处理代码。客户端接收到登录成功的消息之后,再向Proxy发送一个正常的登录请求, 此时根据LoginKey去查找对应的PendingLogin,并获取对应的Proxy,并执行attachToClient来重新绑定客户端连接,这样完整的顶号流程就结束了。
/**
* This method is called by a client to make initial contact with the BaseApp.
* It should be called after the client has logged in via the LoginApp.
*/
void LoginHandler::login( Mercury::NetworkInterface & networkInterface,
const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
const BaseAppExtInterface::baseAppLoginArgs & args )
{
PendingLogins::iterator pendingIter = pPendingLogins_->find( args.key );
if (pendingIter == pPendingLogins_->end())
{
INFO_MSG( "LoginHandler::login(%s): "
"No pending login for loginKey %u. Attempt = %u\n",
srcAddr.c_str(), args.key, args.numAttempts );
// Bad bundle so break out of dispatching the rest.
header.breakBundleLoop();
return;
}
const PendingLogin & pending = pendingIter->second;
SmartPointer<Proxy> pProxy = pending.pProxy();
// 省略一些代码
this->updateStatistics( srcAddr, pending.addrFromLoginApp(), args.numAttempts );
pPendingLogins_->erase( pendingIter );
if (pProxy->attachToClient( srcAddr, header.replyID,
header.pChannel.get() ))
{
INFO_MSG( "LoginHandler::login: "
"%u attached from %s. Attempt %u\n",
pProxy->id(), srcAddr.c_str(), args.numAttempts );
}
}
现在再来思考一下在当前的顶号流程中是否会出现与mosaic_game顶号过程中类似的老客户端数据发往新客户端的行为。仔细想想,还真可能有,即在CellApp上的RealEntity发往老客户端的数据在顶号结束绑定新客户端之后才被Proxy收到,此时如果直接往新的ClientChannel进行投递的话,就会有出现上面担心的问题。这个问题BigWorld中也考虑到了,他这里引入了一个切换客户端之后通知RealEntity的握手机制,具体细节在下面的小节里进行阐述。
客户端进入场景流程
前面的四个小节里我们详尽的介绍了客户端的登录、断线、顶号流程,了解了为了有效的维护Proxy与客户端之间的消息通信所做的各项准备工作。但是消息通道的建立只是开始,我们还需要通过这个通道来同步角色在服务器的各种状态到客户端,然而此时客户端并没有任何entity对象。因此客户端知道Proxy已经接纳了当前的登录之后,会主动发送一个enableEntities请求过来,通知服务器可以向客户端推送相关的entity数据了:
/**
* This method handles a request from the client to enable or disable updates
* from the cell. It forwards this message on to the cell.
*/
void Proxy::enableEntities()
{
DEBUG_MSG( "Proxy::enableEntities(%u)\n", id_ );
// if this is the first we've heard of it, then send the client the props
// it shares with us, call the base script...
if (!basePlayerCreatedOnClient_)
{
this->addCreateBasePlayerToChannelBundle();
this->sendExposedForReplayClientPropertiesToCell();
if (pBufferedClientBundle_.get())
{
// Make sure the BasePlayer arrives before the buffered messages
this->sendBundleToClient();
// TODO: This is because Channel::send will not send arbitrary
// bundles on a channel with a bundle primer present.
// Would it work to copy the messages into the clientBundle instead?
pClientChannel_->bundlePrimer( NULL );
Mercury::Bundle & bundle = pClientChannel_->bundle();
pBufferedClientBundle_->applyToBundle( bundle );
pClientChannel_->send( &bundle );
pClientChannel_->bundlePrimer( &clientBundlePrimer_ );
pBufferedClientBundle_.reset();
}
}
// ... and tell the cell the game is on
if (!entitiesEnabled_)
{
entitiesEnabled_ = true;
if (this->hasCellEntity())
{
this->sendEnableDisableWitness( /*enable:*/true );
// remove ProxyPusher
if (pProxyPusher_ != NULL)
{
delete pProxyPusher_;
pProxyPusher_ = NULL;
}
}
else
{
// Add a proxy pusher because we don't have a cell to do it.
if (pProxyPusher_ == NULL)
{
pProxyPusher_ = new ProxyPusher( this );
}
}
}
if (shouldRunCallbackOnEntitiesEnabled_)
{
// call the script and let it have its naughty way with the client
Script::call( PyObject_GetAttrString( this, "onEntitiesEnabled" ),
PyTuple_New( 0 ), "", true );
shouldRunCallbackOnEntitiesEnabled_ = false;
}
}
这个函数分为两个部分,一个是通过addCreateBasePlayerToChannelBundle将当前Player Entity的数据通过ClientInterface::createBasePlayer请求下发到客户端,让客户端创建PlayerEntity。这些数据在创建Proxy的时候就已经被DBApp填充好了,设置在当前Proxy的Properties里,打包的时候使用EntityDescription::FROM_BASE_TO_CLIENT_DATA来过滤出所有客户端所需要的属性,并组成一个PythonDict,添加到bundle里:
/**
* This method adds the createBasePlayer message to the given bundle
*
* It should immediately follow a successful login or full
* entity reset, so the client is never operating without
* a Base Player entity.
* Note: When this method is called,
* Proxy::sendExposedForReplayClientPropertiesToCell() should be called
* together at the same time.
*
* @param bundle The Mercury::Bundle to add the message to
*/
void Proxy::addCreateBasePlayerToChannelBundle()
{
DEBUG_MSG( "Proxy::addCreateBasePlayerToChannelBundle(%u): "
"Creating player on client\n",
id_ );
MF_ASSERT( pClientChannel_ != NULL );
MF_ASSERT( shouldRunCallbackOnEntitiesEnabled_ == false );
MF_ASSERT( basePlayerCreatedOnClient_ == false );
Mercury::Bundle & bundle = pClientChannel_->bundle();
bundle.startMessage( ClientInterface::createBasePlayer );
bundle << id_ << pType_->description().clientIndex();
this->addAttributesToStream( bundle,
EntityDescription::FROM_BASE_TO_CLIENT_DATA );
shouldRunCallbackOnEntitiesEnabled_ = true;
basePlayerCreatedOnClient_ = true;
}
/**
* This method writes attributes of the entity related to the given dataDomains
* to stream.
*/
bool Base::addAttributesToStream( BinaryOStream & stream, int dataDomains )
{
const EntityDescription & entityDesc = this->pType()->description();
ScriptObject self( this, ScriptObject::FROM_BORROWED_REFERENCE );
ScriptDict attrs = createDictWithAllProperties(entityDesc,
self, pEntityDelegate_.get(), dataDomains);
if (!attrs)
{
return false;
}
return entityDesc.addDictionaryToStream( attrs, stream, dataDomains );
}
另外一个部分是通过sendEnableDisableWitness来通知对应的RealEntity(如果已经创建了RealEntity),新的客户端已经允许同步AOI内的其他对象,可以往客户端发送所有AOI可见对象的相关数据了:
/**
* This method sends an enable or disable witness message to our cell entity.
*
* @param enable whether to enable or disable the witness
* @param isRestore is this an explicit witness enable/disable send as a
* result of a restore cell entity?
*/
void Proxy::sendEnableDisableWitness( bool enable, bool isRestore )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
Mercury::Bundle & bundle = this->cellBundle();
bundle.startRequest( CellAppInterface::enableWitness,
new EnableWitnessReplyHandler( this ) );
bundle << id_;
bundle << isRestore;
++numOutstandingEnableWitness_;
cellHasWitness_ = enable;
if (enable)
{
bundle << BaseAppConfig::bytesPerPacketToClient();
}
// else just send an empty stream
this->sendToCell(); // send it straight away
}
在之前的detachFromClient函数中,也会调用sendEnableDisableWitness,不过此时的参数是false,这样就会通知RealEntity先暂停AOI的同步,来节省流量和CPU等资源。因为即使发过来,也会因为没有客户端而被抛弃。
注意到sendEnableDisableWitness这里有一个特殊的自增操作,++numOutstandingEnableWitness_,这个就是记录现在还有多少个CellAppInterface::enableWitness没有收到reply。如果收到了reply, 则在EnableWitnessReplyHandler会对这个字段做减一操作:
/**
* This reply handler is used to keep track of how many outstanding
* enableWitness there are for each proxy.
*/
class EnableWitnessReplyHandler :
public Mercury::ShutdownSafeReplyMessageHandler
{
public:
EnableWitnessReplyHandler( ProxyPtr pProxy ) :
pProxy_( pProxy )
{
}
void handleMessage( const Mercury::Address& /*srcAddr*/,
Mercury::UnpackedMessageHeader& /*header*/,
BinaryIStream& data, void * /*arg*/ )
{
this->onReply();
}
void onReply()
{
pProxy_->onEnableWitnessAck();
delete this;
}
private:
ProxyPtr pProxy_;
};
/**
* This method is called when there is confirmation that the witness has been
* created.
*/
void Proxy::onEnableWitnessAck()
{
MF_ASSERT( numOutstandingEnableWitness_ > 0 );
--numOutstandingEnableWitness_;
}
如果numOutstandingEnableWitness_的值不为0,则代表CellApp中的RealEntity暂时还没有确认我们发出的客户端连接状态变化。此时可以认为RealEntity下发的客户端数据的目的地是老客户端,而不是新客户端,因此应该抛弃。所以只要在所有往客户端投递数据的接口里做一下这个握手检查,就能有效的避免顶号之后目的地为老客户端的数据被传递到新客户端的错误:
bool Proxy::hasOutstandingEnableWitness() const
{ return numOutstandingEnableWitness_ != 0; }
/**
* This message is the cell telling us that it has now sent us all the
* updates for the given tick, and we should forward them on to the client.
*/
void Proxy::sendToClient()
{
// Do nothing. It's for an old client.
if (!this->hasOutstandingEnableWitness())
{
this->sendBundleToClient();
}
}
/**
* This method forwards this message to the client.
*/
void Proxy::sendMessageToClientHelper( BinaryIStream & data, bool isReliable )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
if (this->hasOutstandingEnableWitness())
{
// Do nothing. It's for an old client.
data.finish();
return;
}
// 省略很多代码
}
/**
* This method forwards this message to the client (reliably)
*/
#define STRUCT_CLIENT_MESSAGE_FORWARDER( MESSAGE ) \
void Proxy::MESSAGE( const BaseAppIntInterface::MESSAGE##Args & args ) \
{ \
if (this->hasOutstandingEnableWitness()) \
{ \
/* Do nothing. It's for an old client. */ \
} \
\\ 省略很多代码
} \
/**
* This method tells us about a change in the status of one of our wards.
* We modify our internal list, then forward the message on to the client.
*/
void Proxy::modWard( const BaseAppIntInterface::modWardArgs & args )
{
if (this->hasOutstandingEnableWitness())
{
// TODO: Should make sure that the wards are reset when given a new
// client.
ERROR_MSG( "Proxy::modWard( %d ): Has outstanding enableWitness\n",
id_ );
return;
}
// 省略很多代码
}
// 省略其他往客户端投递数据的接口
当客户端的PlayerEntity创建完成之后,就可以准备进入特定的游戏场景了。这个部分的逻辑并没有写在cpp代码中,而是在Base对象上增加了几个可以给Python脚本调用的接口,让逻辑层自己决定进入场景的相关参数,典型接口就是createCellEntity:
/*~ function Base.createCellEntity
* @components{ base }
* <i>createCellEntity</i> makes a request to create an associated entity
* within a cell.
*
* The information used to create the cell entity is stored in the
* cellData property of this entity. This property is a dictionary
* corresponding to the values in the entity's .def file plus a
* "position" and "direction" entries for the entity's position
* and (roll, pitch, yaw) direction as well as optional "spaceID"
* and "templateID" entries. In case if "templateID" entry is present
* the non-spatial properties are not transmitted, instead the "templateID"
* is used on CellApp to populate the cell entity's properties
* from a local storage.
*
* If nearbyMB is not passed in, the "spaceID" entry in cellData is
* used to indicate which space to create the cell entity in.
*
* @param nearbyMB an optional mailbox argument which is used to indicate
* which space to create the cell entity in. Ideally, the two entities
* are near so that it is likely that the correct cell will be found
* immediately. Either the base or the cell mailbox of the nearby entity
* can be used; when using base or cell-via-base mailboxes, the entity
* reference will be passed to the __init__() method of the cell entity so
* the nearby entity's position can be used to set the new cell entity's
* position.
*/
PY_METHOD( createCellEntity )
类似的脚本接口还有createInDefaultSpace, 其实这两个接口都是下面接口的封装:
/**
* This method creates the cell entity associated with this entity into the
* input space.
*/
bool Base::createInSpace( SpaceID spaceID, const char * pyErrorPrefix )
{
BaseApp & app = BaseApp::instance();
// TODO:
// As an optimisation, try to find a cell entity mailbox for an existing
// base entity that is in the same space.
//
// This is currently not implemented as there is a potential race-condition.
// The entity may currently be in the same space but may be in a different
// space by the time the createCellEntity message arrives.
std::auto_ptr< Mercury::ReplyMessageHandler > pHandler(
this->prepareForCellCreate( pyErrorPrefix ) );
if (!pHandler.get())
{
return false;
}
Mercury::Channel & channel = BaseApp::getChannel( app.cellAppMgrAddr() );
// We don't use the channel's own bundle here because the streaming might
// fail and the message might need to be aborted halfway through.
std::auto_ptr< Mercury::Bundle > pBundle( channel.newBundle() );
pBundle->startRequest( CellAppMgrInterface::createEntity, pHandler.get() );
*pBundle << spaceID;
// stream on the entity channel version
*pBundle << this->channel().version();
*pBundle << false; /* isRestore */
// See if we can add the necessary data to the bundle
if (!this->addCellCreationData( *pBundle, pyErrorPrefix ))
{
isCreateCellPending_ = false;
isGetCellPending_ = false;
return false;
}
channel.send( pBundle.get() );
pHandler.release(); // Handler deletes itself on callback.
return true;
}
这个函数负责往CellAppMgr发起一个createEntity的请求,参数里传入了要进入的场景标识符spaceID,同时使用addCellCreationData将当前Base里存储的创建CellEntity的相关信息也打包Bundle中。这个addCellCreationData实现比较长,这里就大概介绍一下需要打包那些数据:
- 进入场景的位置、朝向、是否贴地
- 当前
Base所在的BaseApp的通信地址 - 当前
Base属性系统所采取的模板ID,属性模板负责填充一些属性的默认值,这样就没必要将默认值传递过去,只需要传递模板ID - 当前
Base的属性系统里标记为exposedForReplay的client-server属性。
数据打包完成之后,就会发送到CellAppMgr来处理,此时会根据传入的spaceID来找到对应的Space对象,然后执行在Space内创建RealEntity的操作:
/**
* This method creates a new entity on the system. It finds the appropriate
* cell to create the entity on and does so.
*
* @todo Currently adds the entity to the first space but the correct space
* will need to be specified in the message.
*/
void CellAppMgr::createEntity( const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
BinaryIStream & data )
{
SpaceID spaceID;
data >> spaceID;
Space * pSpace = this->findSpace( spaceID );
if (pSpace == NULL)
{
ERROR_MSG( "CellAppMgr::createEntity: Invalid space id %u\n", spaceID );
// Rely on createEntityCommon to send the error reply.
}
else
{
pSpace->hasHadEntities( true );
}
this->createEntityCommon( pSpace, srcAddr, header, data );
}
这个createEntityCommon负责根据出生点的位置来寻找一个合适的Cell,并将这个createEntity的请求转发到Cell上:
/**
* This private method is used by createEntity and createSpace to implement
* their common functionality.
*/
void CellAppMgr::createEntityCommon( Space * pSpace,
const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
BinaryIStream & data )
{
Mercury::ChannelVersion channelVersion = Mercury::SEQ_NULL;
data >> channelVersion;
bool isRestore;
data >> isRestore;
StreamHelper::AddEntityData entityData;
StreamHelper::removeEntity( data, entityData );
const Vector3 & pos = entityData.position;
CellData * pCellData = pSpace ? pSpace->findCell( pos.x, pos.z ) : NULL;
if (pCellData)
{
Mercury::Bundle & bundle = pCellData->cellApp().bundle();
bundle.startRequest( CellAppInterface::createEntity,
new CreateEntityReplyHandler( srcAddr, header.replyID ) );
bundle << pCellData->space().id();
bundle << channelVersion;
bundle << isRestore;
StreamHelper::addEntity( bundle, entityData );
bundle.transfer( data, data.remainingLength() );
pCellData->cellApp().send();
}
else
{
ERROR_MSG( "CellAppMgr::createEntity: "
"No cell found to place entity\n" );
data.finish();
Mercury::ChannelSender sender( CellAppMgr::getChannel( srcAddr ) );
Mercury::Bundle & bundle = sender.bundle();
bundle.startReply( header.replyID );
bundle << NULL_ENTITY_ID;
}
}
当这个Cell接收到CreateEntity请求之后,就开始使用传入的相关参数真正的创建一个Entity,并将创建的Entity的唯一标识符作为结果返回给CellAppMgr:
/**
* This method creates a new real entity on this cell according to the
* parameters in 'data'.
*
* @param srcAddr The address from which this request originated.
* @param header The mercury header
* @param data The data stream
* @param pNearbyEntity A pointer to a nearby entity to use during entity
* creation.
*
* @see createEntityInternal
*/
void Cell::createEntity( const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
BinaryIStream & data,
EntityPtr pNearbyEntity )
{
Mercury::ChannelVersion channelVersion = Mercury::SEQ_NULL;
data >> channelVersion;
bool isRestore;
data >> isRestore;
EntityPtr pEntity = this->createEntityInternal( data, ScriptDict(),
isRestore, channelVersion, pNearbyEntity );
if (isRestore)
{
return;
}
if (header.replyID == Mercury::REPLY_ID_NONE)
{
WARNING_MSG( "Cell::createEntity: Handling non-request createEntity\n" );
return;
}
if (pNearbyEntity == NULL)
{
// Only CellAppMgr sends without a nearby entity.
CellAppMgrGateway & cellAppMgr = CellApp::instance().cellAppMgr();
cellAppMgr.bundle().startReply( header.replyID );
cellAppMgr.bundle() << (pEntity ? pEntity->id() : NULL_ENTITY_ID);
cellAppMgr.send();
return;
}
// 忽略一些代码
}
当createEntityInternal内一个Entity被创建的时候,对应的RealEntity就会在Entity::initReal中尝试被创建,并通过addRealEntity加入到当前的Cell中:
// Build up the Entity structure
EntityPtr pNewEntity = space_.newEntity( id, entityTypeID );
if (!pNewEntity)
{
return NULL;
}
MF_ASSERT( pNewEntity->nextInChunk() == NULL );
MF_ASSERT( pNewEntity->prevInChunk() == NULL );
MF_ASSERT( pNewEntity->pChunk() == NULL );
Entity::callbacksPermitted( false ); // {
if (!pNewEntity->initReal( data, properties, isRestore, channelVersion,
pNearbyEntity ))
{
// 忽略容错代码
return NULL;
}
// And add it to our list of reals
// TODO: If init method destroyed or teleported this entity
// then we should not do this (and destroying/offloading
// would have caused an assert when it removed the entity anyway)
//
// Delay sending the currentCell message to the base so that it will be on
// the same bundle as the backup. This guarantees that there is no window
// between losing cellData and getting the first backup data.
this->addRealEntity( pNewEntity.get(), /*shouldSendNow:*/false );
INFO_MSG( "Cell::createEntityInternal: %s %-10s (%u)\n",
isRestore ? "Restored" : "New",
pNewEntity->pType() ?
pNewEntity->pType()->name() : "INVALID",
pNewEntity->id() );
Entity::population().notifyObservers( *pNewEntity );
// Note: This causes the currentCell message from above to be sent too.
if (pNewEntity->pReal())
{
pNewEntity->pReal()->backup(); // Send backup to BaseApp immediately.
}
return pNewEntity;
对于Space里的一个Entity,要么其拥有RealEntity,要么其拥有GhostEntity,所以这里的注释标注了initReal与initGhost两者必须调用其中一个,且必须在Entity构造之后立即调用:
/**
* This method should be called on a newly created entity to make it a real
* entity. Either this method or initGhost should be called immediately after
* the constructor.
*
* @see initGhost
*/
bool Entity::initReal( BinaryIStream & data, const ScriptDict & properties,
bool isRestore,
Mercury::ChannelVersion channelVersion,
EntityPtr pNearbyEntity )
由于我们当前是第一次进入Space,所以创建的肯定是RealEntity。
当CellAppMgr的CreateEntityReplyHandler接收到创建结果之后,其唯一作用就是将这个回复进一步的转发到发起createEntity调用的Base上的CreateCellEntityHandler去处理:
/*
* This method handles the reply from the cellapp.
*/
void CreateCellEntityHandler::handleMessage( const Mercury::Address & source,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data, void * arg )
{
EntityID entityID;
data >> entityID;
// MF_ASSERT( entityID == pBase_->id() );
if (entityID == pBase_->id())
{
// INFO_MSG( "CreateCellEntityHandler::handleMessage: "
// "Cell entity (%lu) created\n",
// entityID );
pBase_->cellCreationResult( true );
}
else
{
WARNING_MSG( "CreateCellEntityHandler::handleMessage: "
"Failed to create associated cell entity for %u.\n"
"\tResponse was entity id %u\n", pBase_->id(), entityID );
pBase_->cellCreationResult( false );
}
delete this;
}
当成功创建的时候,返回的EntityId肯定是当前Base存储的EntityId,成功之后会执行cellCreationResult来标记当前已经在Cell里创建好了RealEntity:
/**
* This method is called when we find out whether or not creating an
* entity on the cell succeeded.
*/
void Base::cellCreationResult( bool success )
{
if (isDestroyed_)
{
return;
}
// isCreateCellPending_ is also clear before onGetCell is called
if (!isCreateCellPending_)
{
// This may occur in a very rare situation. If the cell was created via
// the CellAppMgr, it's possible (although unlikely) for the CellApp to
// have responded with success but the CellAppMgr responds with failure.
// This occurrs when the CellApp crashes while a
// CreateEntityReplyHandler is outstanding but after setCurrentCell.
if (!success)
{
ERROR_MSG( "Base::cellCreationResult: Ignoring failure after "
"setCurrentCell\n" );
}
return;
}
isCreateCellPending_ = false;
if (!success)
{
isGetCellPending_ = false;
Script::call( PyObject_GetAttrString( this, "onCreateCellFailure" ),
PyTuple_New( 0 ), "onCreateCellFailure", true );
DEBUG_MSG( "Base::cellCreationResult: Failed for %u\n", id_ );
}
}
但是只是标记好了这个isCreateCellPending_,现在的Base里还是不知道所创建的RealEntity的相关信息。这些信息通过addRealEntity触发填充和发送,调用者是createEntityInternal,大家可以回顾一下:
/**
* This method adds the input entity to the cell's internal list of real
* entities.
*
* It is called from Cell::createEntity and by the Entity itself when it is
* onloaded.
*/
void Cell::addRealEntity( Entity * pEntity, bool shouldSendNow )
{
if (!pEntity->isReal())
{
ERROR_MSG( "Cell::addRealEntity called on ghost entity id %u!\n",
pEntity->id() );
return;
}
pEntity->informBaseOfAddress( CellApp::instance().interface().address(),
this->spaceID(), shouldSendNow );
realEntities_.add( pEntity );
}
informBaseOfAddress负责将当前的CellApp的地址与当前场景的SpaceID塞入到BaseAppIntInterface::currentCellArgs这里:
/**
* This method informs the base entity of the address of the cell entity.
*/
void Entity::informBaseOfAddress( const Mercury::Address & addr,
SpaceID spaceID, bool shouldSendNow )
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
// TODO: Move this to RealEntity
MF_ASSERT( this->isReal() );
if (this->hasBase())
{
Mercury::Bundle & bundle = pReal_->channel().bundle();
BaseAppIntInterface::setClientArgs & setClientArgs =
BaseAppIntInterface::setClientArgs::start( bundle );
setClientArgs.id = id_;
// Our base knowing where we are is considered to be critical. In
// particular, if the base doesn't know about this real it may try to
// restore this entity somewhere else following a cellapp crash which
// will cause the !pOtherGhost->isReal() assertion.
BaseAppIntInterface::currentCellArgs & currentCellArgs =
BaseAppIntInterface::currentCellArgs::start(
bundle, Mercury::RELIABLE_CRITICAL );
currentCellArgs.newSpaceID = spaceID;
currentCellArgs.newCellAddr = addr;
if (shouldSendNow)
{
pReal_->channel().send();
}
}
}
当Base接收到currentCell这个RPC的时候,就会设置
/**
* This method is used to inform the base that the cell we send to has changed.
*/
void Base::currentCell( const Mercury::Address & srcAddr,
const Mercury::UnpackedMessageHeader & header,
const BaseAppIntInterface::currentCellArgs & args )
{
this->setCurrentCell( args.newSpaceID, args.newCellAddr,
&srcAddr );
}
/**
* This method is used to inform the base that the cell we send to has changed.
*/
void Base::setCurrentCell( SpaceID spaceID,
const Mercury::Address & cellAppAddr,
const Mercury::Address * pSrcAddr, bool shouldReset )
{
// Make sure that we are still around after any script call.
PyObjectPtr pThis = this;
// If we're losing our cell entity, flush the channel.
if (cellAppAddr == Mercury::Address::NONE)
{
if (pChannel_->isEstablished())
{
pChannel_->send();
// We have to reset the channel here because we might get another
// cell entity later on and that cell entity will expect the channel
// to be in a reset state. This will put the channel into the
// 'wantsFirstPacket_' state, so even if a packet arrives from the
// old app, it will be dropped.
pChannel_->reset( Mercury::Address::NONE, false );
}
}
// If we're getting a cell entity or offloading, just switch the address.
else
{
// Usually we don't need to manually switch address here, since the
// channel has the autoSwitchToSrcAddr flag enabled and it has already
// been done by Mercury. We still need to do this when called from
// emergencySetCurrentCell() however.
if (shouldReset)
{
pChannel_->reset( cellAppAddr );
}
else
{
pChannel_->setAddress( cellAppAddr );
}
}
// 暂时先省略一部分代码
}
这个setCurrentCell的作用就是将pChannel_的地址设置为当前CellAppAddr,之后Base与RealEntity之间的通信就可以使用这个pChannel_了。同时我们再回顾一下Proxy与客户端的通信使用的是pClientChannel_,不要跟这里的pChannel_搞混了。
在pChannel_设置之后,setCurrentCell开始通知其他组件当前的RealEntity已经创建,下面就是之前省略的后续代码:
if (pCellEntityMailBox_ != NULL)
{
bool hadCell = (pCellEntityMailBox_->address().ip != 0);
bool haveCell = (cellAppAddr.ip != 0);
pCellEntityMailBox_->address( pChannel_->addr() );
spaceID_ = spaceID;
if (hadCell != haveCell)
{
isGetCellPending_ = false;
if (haveCell)
{
pCellData_ = NULL;
}
else
{
cellBackupData_.clear();
}
// inform the proxy that the cell entity has gone
// (even if a new one is requested by the script method below)
if (this->isProxy() && hadCell)
{
((Proxy*)this)->cellEntityDestroyed( pSrcAddr );
}
if (haveCell)
{
isCreateCellPending_ = false;
// There might still be stuff waiting for a valid IP address to
// come along
pChannel_->send();
}
// call the script method notifying it of this event
char * methodName = (char*)(haveCell ? "onGetCell" : "onLoseCell");
PyObject * pMethod = PyObject_GetAttrString( this, methodName );
if (pMethod == NULL)
PyErr_Clear();
// inform the proxy that the cell entity is ready
if (this->isProxy() && this->hasCellEntity())
{
((Proxy*)this)->cellEntityCreated();
}
if (pMethod)
{
Script::call( pMethod, PyTuple_New( 0 ), methodName );
}
// notify the delegate about this event
if (pEntityDelegate_)
{
if (haveCell)
{
//TODO: maybe we should add the handling of this condition to the delegate,
// but meanwhile it's not needed/handled.
//
//pEntityDelegate_->onGetCell();
}
else
{
pEntityDelegate_->onLoseCell();
}
}
}
}
由于这个SetCurrentCell应对了很多种RealEntity地址修改的情况,我们目前先聚焦于RealEntity第一次创建时的处理,也就是这里的cellEntityCreated函数:
/**
* This method deals with our cell entity being created.
*/
void Proxy::cellEntityCreated()
{
AUTO_SCOPED_THIS_ENTITY_PROFILE;
if (!entitiesEnabled_) return;
MF_ASSERT( this->hasClient() );
MF_ASSERT( this->hasCellEntity() );
// create the witness
this->sendEnableDisableWitness( /*enable:*/true );
// get rid of the proxy pusher now that the witness will be sending us
// regular updates (the self motivator should definitely be there).
MF_ASSERT( pProxyPusher_ != NULL );
delete pProxyPusher_;
pProxyPusher_ = NULL;
}
这里的sendEnableDisableWitness我们之前在顶号的时候介绍过了,其作用是通知RealEntity开始处理自身以及AOI向客户端的同步。