BigWorld 的进程生命周期
启动流程
在Bigworld里,每种进程都有一个专门的main.cpp文件来定义其启动时的main函数,其文件内容大同小异,只是bwMainT< >的模板参数不同。其模板参数就是进程的App类,例如CellApp、BaseApp、LoginApp等:
// server\dbappmgr\main.cpp
int BIGWORLD_MAIN( int argc, char * argv[] )
{
return bwMainT< DBAppMgr >( argc, argv );
}
// server\loginapp\main.cpp
int BIGWORLD_MAIN( int argc, char * argv[] )
{
return bwMainT< LoginApp >( argc, argv );
}
这里的BIGWORLD_MAIN宏定义了进程的main函数,来封装一些通用的处理流程,例如初始化Bigworld的资源和配置,解析命令行参数等,最后调用bwMainT< >来启动进程的主循环:
#define BIGWORLD_MAIN \
bwMain( int argc, char * argv[] ); \
int main( int argc, char * argv[] ) \
{ \
BW_SYSTEMSTAGE_MAIN(); \
BWResource bwresource; \
BWResource::init( argc, (const char **)argv ); \
BWConfig::init( argc, argv ); \
bwParseCommandLine( argc, argv ); \
return bwMain( argc, argv ); \
} \
int bwMain
这个bwMainT< >模板函数负责进程运行环境的准备,包括初始化全局事件驱动器EventDispatcher、网络资源interface以及注册进程退出的信号处理函数signalProcessor。初始化好了这三个重要组件之后,继续利用模板函数doBWMainT< >来启动进程的主循环:
// lib\server\bwservice.hpp
template <class SERVER_APP>
int bwMainT( int argc, char * argv[], bool shouldLog = true )
{
Mercury::EventDispatcher dispatcher;
// Find the internal interface IP by querying BWMachined
if (!Mercury::MachineDaemon::queryForInternalInterface(
ServerApp::discoveredInternalIP ))
{
WARNING_MSG( "bwMainT: "
"Unable to determine internal interface via BWMachineD query.\n" );
}
BW::string internalInterfaceName =
getBWInternalInterfaceSetting( SERVER_APP::configPath() );
Mercury::NetworkInterface interface( &dispatcher,
Mercury::NETWORK_INTERFACE_INTERNAL, 0,
internalInterfaceName.c_str() );
SignalProcessor signalProcessor( dispatcher );
BW_MESSAGE_FORWARDER3( SERVER_APP::appName(), SERVER_APP::configPath(),
/*ENABLED=*/shouldLog, dispatcher, interface );
START_MSG( SERVER_APP::appName() );
if (internalInterfaceName != Mercury::NetworkInterface::USE_BWMACHINED)
{
CONFIG_WARNING_MSG( "internalInterface set to '%s' in bw.xml. "
"This option is deprecated. It is recommended to not set this "
"value. The default behaviour is to use the same interface as "
"bwmachined. This is controlled by the broadcast route.\n",
internalInterfaceName.c_str() );
}
int result = doBWMainT< SERVER_APP >( dispatcher, interface, argc, argv );
INFO_MSG( "%s has shut down.\n", SERVER_APP::appName() );
return result;
}
这个doBWMainT的实现就非常简单了,主要就是创建一个SERVER_APP的实例,然后调用其runApp方法来启动进程的主循环:
template <class SERVER_APP>
int doBWMainT( Mercury::EventDispatcher & dispatcher,
Mercury::NetworkInterface & interface,
int argc, char * argv[] )
{
if (!ServerAppConfig::init( SERVER_APP::Config::postInit ))
{
CONFIG_ERROR_MSG( "Failed to initialise configuration options. "
"See earlier error message for details.\n" );
return -1;
}
SERVER_APP serverApp( dispatcher, interface );
serverApp.setBuildDate( __TIME__, __DATE__ );
return serverApp.runApp( argc, argv ) ? EXIT_SUCCESS : EXIT_FAILURE;
}
每种进程都有一个对应的App类,这些App类都继承自ServerApp类。ServerApp类是Bigworld进程管理的核心类,ServerApp::runApp来调用init初始化相关组件:
/**
* This method runs this application.
*/
bool ServerApp::runApp( int argc, char * argv[] )
{
// calculate the clock speed
stampsPerSecond();
bool result = false;
if (this->init( argc, argv ))
{
INFO_MSG( "---- %s is running ----\n", this->getAppName() );
result = this->run();
}
else
{
ERROR_MSG( "Failed to initialise %s\n", this->getAppName() );
}
this->fini();
interface_.prepareForShutdown();
#if ENABLE_PROFILER
g_profiler.fini();
#endif
return result;
}
这里的init方法负责初始化性能监控g_profiler,退出信号处理函数pSignalHandler_,以及FD的最大打开数,并设置网络接口的日志等级:
/**
* Initialisation function.
*
* This needs to be called from subclasses' overrides.
*/
bool ServerApp::init( int argc, char * argv[] )
{
PROFILER_SCOPED( ServerApp_init );
bool runFromMachined = false;
for (int i = 1; i < argc; ++i)
{
if (strcmp( argv[i], "-machined" ))
{
runFromMachined = true;
}
}
INFO_MSG( "ServerApp::init: Run from bwmachined = %s\n",
watcherValueToString( runFromMachined ).c_str() );
#if ENABLE_PROFILER
if (ServerAppConfig::hasHitchDetection())
{
g_profiler.setProfileMode( Profiler::SORT_BY_TIME, false );
}
#endif
pSignalHandler_.reset( this->createSignalHandler() );
// Handle signals
this->enableSignalHandler( SIGINT );
this->raiseFileDescriptorLimit( ServerAppConfig::maxOpenFileDescriptors() );
interface_.verbosityLevel( ServerAppConfig::isProduction() ?
Mercury::NetworkInterface::VERBOSITY_LEVEL_NORMAL :
Mercury::NetworkInterface::VERBOSITY_LEVEL_DEBUG );
return true;
}
在完成了init之后,ServerApp::runApp接下来会调用run方法来启动进程的主循环EventDispatcher::processOnce:
/**
* This is the default implementation of run. Derived classes to override
* this to implement their own functionality.
*/
bool ServerApp::run()
{
mainDispatcher_.processUntilBreak();
this->onRunComplete();
return true;
}
/**
* This method call processContinuously until breakProcessing is called.
*/
void EventDispatcher::processUntilBreak()
{
this->processContinuously();
pErrorReporter_->reportPendingExceptions( true /* reportBelowThreshold */ );
}
/**
* This method processes events continuously until interrupted by a call to
* breakProcessing.
*
* @see breakProcessing
*/
void EventDispatcher::processContinuously()
{
breakProcessing_ = false;
while (!breakProcessing_)
{
this->processOnce( /* shouldIdle */ true );
}
}
在processOnce方法中,会先处理频繁任务processFrequentTasks,然后处理定时器processTimers,随后是性能统计processStats,最后处理网络事件processNetwork:
/**
* This method processes the current events.
*
* @param shouldIdle If set to true, this method will block until the next
* timer is due if there is nothing waiting on the network.
*
* @return The number of network events processed.
*/
int EventDispatcher::processOnce( bool shouldIdle /* = false */ )
{
breakProcessing_ = false;
this->processFrequentTasks();
if (!breakProcessing_)
{
this->processTimers();
}
this->processStats();
if (!breakProcessing_)
{
return this->processNetwork( shouldIdle );
}
return 0;
}
这个processFrequentTasks方法会通过FrequentTasks::process方法来处理所有通过mainDispatcher_.addFrequentTask注册过来的FrequentTask,目前主要用到这个FrequentTask的地方不多,只有mysql,signal和http相关的任务会注册到FrequentTasks中:
/**
* This method calls doTask on all registered FrequentTasks objects.
*/
void EventDispatcher::processFrequentTasks()
{
PROFILER_SCOPED( processFrequentTasks );
pFrequentTasks_->process();
}
// lib\db_storage_mysql\mysql_database.cpp
/**
* This method implements the FrequentTask method.
*/
void MySqlDatabase::doTask()
{
bgTaskManager_.tick();
}
// lib\server\signal_processor.cpp
void SignalProcessor::doTask()
{
this->dispatch();
}
/**
* Handle frequent task trigger.
*/
void SignalProcessor::dispatch()
{
const Signal::Set allSignals( Signal::Set::FULL );
Signal::Blocker blocker( allSignals );
int sigNum = SIGMIN;
while (sigNum <= SIGMAX)
{
if (signals_.isSet( sigNum ))
{
this->dispatchSignal( sigNum );
}
++sigNum;
}
signals_.clearAll();
}
在processTimers方法里会调用TimeQueue::process方法来处理全局计时器队列pTimeQueue_里所有到期的定时器任务,这个方法会返回处理的定时器任务数量,我们可以通过这个数量来判断是否有定时器任务到期:
/**
* This method processes outstanding timers.
*/
void EventDispatcher::processTimers()
{
PROFILER_SCOPED( processTimers );
numTimerCalls_ += pTimeQueue_->process( timestamp() );
}
在processNetwork方法中,会调用Poller::processPendingEvents方法来处理所有等待在网络上的事件,这个方法会返回处理的事件数量,我们可以通过这个数量来判断是否有网络事件发生:
/**
* This method processes any activity on the network.
*
* @param shouldIdle If set to true, this method will block until the next
* timer is due if there is nothing waiting on the network.
* @return Number of file descriptors that triggered handlers.
*/
int EventDispatcher::processNetwork( bool shouldIdle )
{
PROFILER_SCOPED( processNetwork );
// select for network activity until earliest timer
double maxWait = shouldIdle ? this->calculateWait() : 0.0;
return pPoller_->processPendingEvents( maxWait );
}
目前这个Poller的实现有三种, 就是常见的Linux系统上的select,poll和epoll:
SelectPoller调用select来等待文件描述符的相关状态改变,并执行对应的事件处理函数。PollPoller调用poll来等待文件描述符的相关状态改变,并执行对应的事件处理函数。EPoller调用epoll_wait来等待文件描述符的相关状态改变,并执行对应的事件处理函数。
运行的时候根据编译选项会选择不同的Poller实现,默认是SelectPoller,如果编译选项中定义了HAS_POLL,则会选择PollPoller,如果定义了HAS_EPOLL,则会选择EPoller。
/**
* This static method creates an appropriate EventPoller. It may use select or
* epoll.
*/
EventPoller * EventPoller::create()
{
#if defined( HAS_POLL )
return new PollPoller();
#elif defined( HAS_EPOLL )
return new EPoller();
#else // !defined( HAS_EPOLL )
return new SelectPoller();
#endif // defined( HAS_EPOLL )
}
这里的事件循环里并没有处理具体的业务逻辑,只是处理了网络事件和定时器任务。ServerApp的具体子类负责利用这里提供的Task和Timer组件来插入自身的业务逻辑。以最简单的DBApp为例,在其连接到数据库之后会调用initTimers,这里会开启一个固定间隔且重复调用的计时器来驱动GameTick,在其超时处理函数handleTimeout里用来定时调用advanceTime方法来推进时间:
/**
* This method sets up required timers.
*
* @return true on success, false otherwise.
*/
bool DBApp::initTimers()
{
MF_ASSERT( status_.status() < DBStatus::RUNNING );
// A one second timer to check all sorts of things, including whether to
// start the server running if we are waiting for other components to
// be ready.
statusCheckTimer_ = mainDispatcher_.addTimer( 1000000, this,
reinterpret_cast< void * >( TIMEOUT_STATUS_CHECK ),
"StatusCheck" );
// NOTE: DBApp's time is not synchronised with the rest of the cluster.
gameTimer_ = mainDispatcher_.addTimer( 1000000/Config::updateHertz(), this,
reinterpret_cast< void * >( TIMEOUT_GAME_TICK ),
"GameTick" );
return true;
}
/**
* This method handles timer events. It is called every second.
*/
void DBApp::handleTimeout( TimerHandle handle, void * arg )
{
switch (reinterpret_cast< uintptr >( arg ))
{
case TIMEOUT_GAME_TICK:
this->advanceTime();
break;
case TIMEOUT_STATUS_CHECK:
this->checkStatus();
break;
}
}
这个advanceTime由ServerApp这个基类来提供,主要内容就是推进游戏时间,提供游戏的Tick机制,也就是常说的游戏帧的概念。内部会调用onTickPeriod、onEndOfTick、onStartOfTick、onTickProcessingComplete等回调函数,以及调用callUpdatables方法来更新所有Updatable对象。
/**
* This method increments the game time.
*/
void ServerApp::advanceTime()
{
if (lastAdvanceTime_ != 0)
{
double tickPeriod = stampsToSeconds( timestamp() - lastAdvanceTime_ );
this->onTickPeriod( tickPeriod );
}
lastAdvanceTime_ = timestamp();
this->onEndOfTick();
++time_;
#if ENABLE_PROFILER
g_profiler.tick();
#endif
this->onStartOfTick();
this->callUpdatables();
this->onTickProcessingComplete();
}
这里的几个Tick相关函数目前基本都是空实现,主要是为了预留扩展点,方便子类来实现自己的逻辑。只有ServerApp::onTickPeriod提供了一个基础实现,就是对过长的帧时间进行警告并记录:
virtual void onTickPeriod( double tickPeriod );
/*
* This method gives subclasses a chance to act at the end of a tick
* immediately before the current server time is incremented.
*/
virtual void onEndOfTick() {};
/*
* This method gives subclasses a chance to act at the beginning of a tick
* immediately after the current server time is incremented.
*/
virtual void onStartOfTick() {};
/*
* This method gives subclassses a chance to act at the end of ServerApp's
* tick processing, immediately before control returns to the caller of
* ServerApp::advanceTime();
*/
virtual void onTickProcessingComplete() {};
/*
* This method gives subclasses the tick period each time advanceTime is
* called
*/
void ServerApp::onTickPeriod( double tickPeriod )
{
if (tickPeriod * ServerAppConfig::updateHertz() > 2.0)
{
WARNING_MSG( "ServerApp::onTickPeriod: "
"Last game tick took %.2f seconds. Expected %.2f.\n",
tickPeriod, 1.0/ServerAppConfig::updateHertz() );
}
#if ENABLE_PROFILER
if (ServerAppConfig::hasHitchDetection() &&
((tickPeriod * ServerAppConfig::updateHertz()) >
ServerAppConfig::hitchDetectionThreshold()))
{
WARNING_MSG( "Service::onTickPeriod: "
"Server hitch detected, creating JSON dump.\n" );
g_profiler.dumpThisFrame();
}
#endif
}
注册互联
Bigworld服务器集群里会存在各种角色的进程,包括DBApp、GameApp、LoginApp等。对于DBApp、BaseApp、CellApp等角色的进程,在一个集群里会有一个到多个具体的实例。为了统一管理这些不定数量的角色实例,Bigworld里设计了DBAppMgr、BaseAppMgr和CellAppMgr等管理器。在这些DBApp、BaseApp、CellApp等角色的进程启动之后,会将这些进程注册到对应的Mgr实例里。这个注册机制依赖于ServerApp上提供的一个ManagerAppGateway类型的对象,默认情况下ServerApp::pManagerAppGateway返回空,只有在这些需要被统一管理的进程角色里会对这个接口进行重载:
// lib\server\server_app.hpp
virtual ManagerAppGateway * pManagerAppGateway() { return NULL; }
// server\baseapp\baseapp.hpp
ManagerAppGateway * pManagerAppGateway() /* override */
{
return &baseAppMgr_;
}
// server\cellapp\cellapp.hpp
ManagerAppGateway * pManagerAppGateway() /* override */
{
return &cellAppMgr_;
}
// server\dbapp\dbapp.hpp
ManagerAppGateway * pManagerAppGateway() /* override */
{
return &dbAppMgr_;
}
这个ManagerAppGateway负责当前进程与对应的管理进程之间的注册与反注册,构造的时候需要提供通信用的网络通道networkInterface和反注册用的RPC retireAppIE:
ManagerAppGateway::ManagerAppGateway( Mercury::NetworkInterface & networkInterface,
const Mercury::InterfaceElement & retireAppIE ) :
channel_( networkInterface, Mercury::Address::NONE ),
retireAppIE_( retireAppIE )
{
MF_ASSERT( retireAppIE_.lengthStyle() == Mercury::FIXED_LENGTH_MESSAGE );
MF_ASSERT( retireAppIE_.lengthParam() == 0 );
}
// server\dbapp\dbappmgr_gateway.cpp
DBAppMgrGateway::DBAppMgrGateway( Mercury::NetworkInterface & interface ) :
ManagerAppGateway( interface, DBAppMgrInterface::retireApp )
{}
// server\cellapp\cellappmgr_gateway.cpp
CellAppMgrGateway::CellAppMgrGateway( Mercury::NetworkInterface & interface ) :
ManagerAppGateway( interface, CellAppMgrInterface::retireApp )
{}
// server\baseapp\baseappmgr_gateway.cpp
BaseAppMgrGateway::BaseAppMgrGateway( Mercury::NetworkInterface & interface ) :
ManagerAppGateway( interface, BaseAppMgrInterface::retireApp )
{}
这个ManagerAppGateway构造函数并不直接负责注册,依赖于外部来调用ManagerAppGateway::init来初始化底层的通道Channel。接口需要提供能与目标ManagerApp通信的interfaceName,通过这个interfaceName来查询对应的通信地址:
bool ManagerAppGateway::init( const char * interfaceName, int numRetries, float maxMgrRegisterStagger )
{
if (!almostZero( maxMgrRegisterStagger ))
{
const float MICROSECONDS_IN_SECOND = 1000000.0f;
// Spread starting time of processes within a tick to avoid possible network peaks during startup
BWRandom rand( mf_getpid() );
uint32 delay = static_cast<uint>( rand( 0.0f, maxMgrRegisterStagger ) * MICROSECONDS_IN_SECOND );
if (delay > 0)
{
DEBUG_MSG( "ManagerAppGateway::init: "
"Manager Registration Stagger mode is active: maxMgrRegisterStagger: %f s. "
"Delaying process start for %f s.\n",
maxMgrRegisterStagger, delay / MICROSECONDS_IN_SECOND );
usleep( delay );
}
}
Mercury::Address addr;
Mercury::Reason reason =
Mercury::MachineDaemon::findInterface( interfaceName,
0, addr, numRetries );
if (reason == Mercury::REASON_SUCCESS)
{
channel_.addr( addr );
}
// This channel is irregular until we start the game tick timer.
this->isRegular( false );
return reason == Mercury::REASON_SUCCESS;
}
这个Mercury::MachineDaemon::findInterface函数负责查询interfaceName对应的通信地址addr,具体做法是向当前进程所属的MachineDaemon发送一个广播消息ProcessStatsMessage,查询所有能提供interfaceName的进程的通信地址,获取第一个返回的通信地址:
/**
* This method finds a specified interface on the network.
* WARNING: This function always blocks.
*
* @param name Only interfaces with this name are considered.
* @param id Only interfaces with this ID are considered, if
* negative all are considered.
* @param addr Output address of the found interface.
* @param retries The number of retries if no interface is found.
* @param verboseRetry Flag for versbose output on retry.
* @param pHandler Handler to process ProcessStatsMessages responses. If
* NULL, the default FindFirstInterfaceHandler is used.
*
* @return A Mercury::REASON_SUCCESS if an interface was found, a
* Mercury::REASON_TIMER_EXPIRED if an interface was not found, other
* Mercury::Reasons are returned if there is an error.
*/
Reason findInterface( const char * name, int id,
Address & addr, int retries, bool verboseRetry,
IFindInterfaceHandler * pHandler )
{
ProcessStatsMessage pm;
pm.param_ = pm.PARAM_USE_CATEGORY |
pm.PARAM_USE_UID |
pm.PARAM_USE_NAME |
(id <= 0 ? 0 : pm.PARAM_USE_ID);
pm.category_ = pm.SERVER_COMPONENT;
pm.uid_ = getUserId();
pm.name_ = name;
pm.id_ = id;
IFindInterfaceHandler * pDefaultHandler = NULL;
if (!pHandler)
{
pDefaultHandler = new FindFirstInterfaceHandler();
pHandler = pDefaultHandler;
}
int attempt = 0;
retries = std::max( retries, 1 );
while (++attempt <= retries)
{
Reason reason = pm.sendAndRecv( 0, BROADCAST, pHandler );
if (reason != REASON_SUCCESS)
{
return reason;
}
if (pHandler->hasError())
{
return REASON_GENERAL_NETWORK;
}
Address result = pHandler->result();
if (result != Address::NONE)
{
addr = result;
return REASON_SUCCESS;
}
if (verboseRetry)
{
INFO_MSG( "MachineDaemon::findInterface: "
"Failed to find %s for UID %d on attempt %d.\n",
name, pm.uid_, attempt );
}
// Sleep a little because sendAndReceiveMGM() is too fast now! :)
#if defined( PLAYSTATION3 )
sys_timer_sleep( 1 );
#elif !defined( _WIN32 )
sleep( 1 );
#else
Sleep( 1000 );
#endif
}
if (pDefaultHandler)
{
bw_safe_delete( pDefaultHandler );
}
return REASON_TIMER_EXPIRED;
}
这个findInterface的核心点在于ProcessStatsMessage的广播,需要探究一下这个消息是如何通知到当前服务器所在集群的MachineDaemon的。
Mercury::Reason MachineGuardMessage::sendAndRecv( uint32 srcip, uint32 destaddr,
ReplyHandler *pHandler )
{
BW_GUARD;
// Set up socket
Endpoint ep;
ep.socket( SOCK_DGRAM );
if (!ep.good() || ep.bind( 0, srcip ) != 0)
{
return Mercury::REASON_GENERAL_NETWORK;
}
return this->sendAndRecv( ep, destaddr, pHandler );
}
/**
* This method sends this MachineGuardMessage message to the bwmachined at the
* input address. The reply messages received are handled by the provided
* handler.
*
* Note: If sending to BROADCAST, REASON_TIMER_EXPIRED will be returned if not
* all bwmachined daemons reply, even if some are successful.
*/
Mercury::Reason MachineGuardMessage::sendAndRecv( Endpoint &ep, uint32 destaddr,
ReplyHandler *pHandler )
{
BW_GUARD;
if (destaddr == BROADCAST)
{
ep.setbroadcast( true );
}
char recvbuf[ MGMPacket::MAX_SIZE ];
MachineGuardResponseChecker responseChecker;
int countdown = 3;
while (countdown--)
{
if (!this->sendto( ep, htons( PORT_MACHINED ), destaddr,
MGMPacket::PACKET_STAGGER_REPLIES ))
{
ERROR_MSG( "MachineGuardMessage::sendAndRecv: "
"Failed to send entire MGM (#%d tries left)\n",
countdown );
continue;
}
// 省略很多代码
}
ERROR_MSG( "MachineGuardMessage::sendAndRecv: timed out!\n" );
return Mercury::REASON_TIMER_EXPIRED;
}
这里的sendto函数调用的时候,第二个参数是端口号PORT_MACHINED,这个端口号是bwmachined的监听端口,用于接收MachineGuardMessage消息。第三个参数是目标机器的地址,当前执行findInterface的时候被设置为了const uint32 BROADCAST = 0xFFFFFFFF;,表示广播发送。这个广播的UDP消息包只会发送到当前本地网络里的所有设备,不会被路由器转发到其他网段。这样粗暴的使用广播数据包来执行服务发现的方法真是令人叹为观止!这种方法的成功依赖于局域网内有开启PORT_MACHINED的监听的进程,目前Bigworld里使用了一个单独的BWMachineD进程来监听这个端口
/**
* Discover the broadcast interface to use and init all the endpoints.
*/
void BWMachined::initNetworkInterfaces()
{
// Determine which network interface will be sending broadcast messages
if (broadcastAddr_ == 0 && !this->findBroadcastInterface())
{
syslog( LOG_CRIT, "Failed to determine default broadcast interface. "
"Make sure UDP ports %d and %d are not firewalled and that "
"your broadcast route is set correctly. e.g. "
"/sbin/ip route add broadcast 255.255.255.255 dev eth0",
PORT_MACHINED, PORT_BROADCAST_DISCOVERY );
exit( EXIT_FAILURE );
}
if (!ep_.good() ||
ep_.bind( htons( PORT_MACHINED ), broadcastAddr_ ) == -1)
{
syslog( LOG_CRIT, "Failed to bind socket to '%s'. %s.",
inet_ntoa((struct in_addr &)broadcastAddr_),
strerror(errno) );
exit( EXIT_FAILURE );
}
ep_.setbroadcast( true );
if (!epLocal_.good() ||
epLocal_.bind( htons( PORT_MACHINED ), LOCALHOST ) == -1)
{
syslog( LOG_CRIT, "Failed to bind socket to (lo). %s.",
strerror(errno) );
exit( EXIT_FAILURE );
}
if (!epBroadcast_.good() ||
epBroadcast_.bind( htons( PORT_MACHINED ), BROADCAST ) == -1)
{
syslog( LOG_CRIT, "Failed to bind socket to '%s'. %s.",
inet_ntoa((struct in_addr &)BROADCAST),
strerror(errno) );
exit( EXIT_FAILURE );
}
cluster_.ownAddr_ = broadcastAddr_;
}
这个BWMachineD进程在每个物理机上都有一个实例,然后多个实例之间通过PORT_MACHINED端口进行通信,从而组成一个集群cluster。
当一个ServerApp启动的时候,会将当前进程的地址、端口、pid、网络角色等信息打包成一个MachineGuardMessage消息,然后发送到本机的PORT_MACHINED端口。
// server\baseapp\baseapp.cpp
/**
* This method does the portion of the init after this app has been
* successfully added to the BaseAppMgr.
*/
bool BaseApp::finishInit( const BaseAppInitData & initData,
BinaryIStream & stream )
{
// 省略一些代码
if (isServiceApp_)
{
BaseAppIntInterface::registerWithMachinedAs( "ServiceAppInterface",
this->intInterface(), id_ );
}
else
{
BaseAppIntInterface::registerWithMachined( this->intInterface(), id_ );
}
}
// server\baseappmgr\baseappmgr.cpp
/**
* This method initialises this object.
*
* @return True on success, false otherwise.
*/
bool BaseAppMgr::init( int argc, char * argv[] )
{
// 省略一些代码
Mercury::Reason reason =
BaseAppMgrInterface::registerWithMachined( interface_, 0 );
}
// server\cellapp\cellapp.cpp
/**
* This method handles the portion of init after registering with the
* CellAppMgr.
*/
bool CellApp::finishInit( const CellAppInitData & initData )
{
// 省略一些代码
CellAppInterface::registerWithMachined( this->interface(), id_ );
}
// server\cellappmgr\cellappmgr.cpp
/**
* The initialisation method.
*/
bool CellAppMgr::init( int argc, char * argv [] )
{
// 省略一些代码
{
CellAppMgrInterface::registerWithInterface( interface_ );
Mercury::Reason reason =
CellAppMgrInterface::registerWithMachined( interface_, 0 );
}
}
// server\dbapp\dbapp.cpp
/**
* This method performs initialisation for our newly received DBApp ID.
*
* @return true on success, false otherwise.
*/
bool DBApp::initAppIDRegistration()
{
// 省略一些代码
if (DBAppInterface::registerWithMachined( interface_, id_ ) !=
Mercury::REASON_SUCCESS)
{
NETWORK_ERROR_MSG( "DBApp::initAppIDRegistration: "
"Unable to register with interface. Is machined running?\n" );
return false;
}
}
// server\dbappmgr\dbappmgr.cpp
bool DBAppMgr::init( int argc, char * argv [] )
{
// 省略一些代码
reason =
DBAppMgrInterface::registerWithMachined( interface_, 0 );
}
// server\loginapp\loginapp.cpp
/**
* This method completes initialisation after registration to DBAppMgr.
*
* @param appID The LoginApp ID.
* @param dbAppAlphaAddress The address of DBApp Alpha.
*/
bool LoginApp::finishInit( LoginAppID appID,
const Mercury::Address & dbAppAlphaAddress )
{
DEBUG_MSG( "LoginApp::finishInit: id %d (DBApp Alpha: %s)\n",
appID, dbAppAlphaAddress.c_str() );
id_ = appID;
dbAppAlpha_.addr( dbAppAlphaAddress );
Mercury::Reason reason =
LoginIntInterface::registerWithMachined( this->intInterface(), id_ );
// 省略一些代码
}
上述各种Interface::registerWithMachined执行的都是NetworkInterface::registerWithMachined:
/**
* This method is used to register or deregister an interface with the machine
* guard (a.k.a. machined).
*/
Reason NetworkInterface::registerWithMachined(
const BW::string & name, int id )
{
return this->interfaceTable().registerWithMachined( this->address(),
name, id );
}
Reason InterfaceTable::registerWithMachined( const Address & addr )
{
return MachineDaemon::registerWithMachined( addr,
name_, id_ );
}
Reason InterfaceTable::registerWithMachined( const Address & addr,
const BW::string & name, int id )
{
name_ = name;
id_ = id;
return this->registerWithMachined( addr );
}
最后这里的MachineDaemon::registerWithMachined就是往LOCALHOST的BWMachineD发送注册信息的过程:
/**
* This function registers a socket with BWMachined.
*/
Reason registerWithMachined( const Address & srcAddr,
const BW::string & name, int id, bool isRegister )
{
if (name.empty())
{
return REASON_SUCCESS;
}
#ifdef MF_SERVER
// Do not call blocking reply handler after registering with bwmachined as
// other processes can now find us and send other messages.
BlockingReplyHandler::safeToCall( false );
#endif
ProcessMessage pm;
pm.param_ = (isRegister ? pm.REGISTER : pm.DEREGISTER) |
pm.PARAM_IS_MSGTYPE;
pm.category_ = ProcessMessage::SERVER_COMPONENT;
pm.port_ = srcAddr.port;
pm.name_ = name;
pm.id_ = id;
pm.majorVersion_ = BWVersion::majorNumber();
pm.minorVersion_ = BWVersion::minorNumber();
pm.patchVersion_ = BWVersion::patchNumber();
ProcessMessageHandler pmh;
// send and wait for the reply
const uint32 destAddr = LOCALHOST;
Reason response = pm.sendAndRecv( srcAddr.ip, destAddr, &pmh );
return pmh.hasResponded_ ? response : REASON_TIMER_EXPIRED;
}
注意到这里只是往LOCALHOST发送注册消息,而不是广播发送到PORT_MACHINED端口,这样做是没什么问题的。因为目前BWMachineD之间并不会执行信息共享,所以只需要往LOCALHOST发送注册消息即可。只要查询的时候使用的是广播接口0xffffffff就没问题,因为广播消息总是可以发送到同一个集群里的各个物理机器。
当BWMachineD收到注册消息后,会将其记录到其内部的数组procs_中,这个数组里的每个元素都代表本机里能提供注册服务的一个进程:
/**
* @internal
* A ProcessStatsMessage is a more detailed version of ProcessMessage that
* provides CPU and memory usage info about the process.
*/
class ProcessStatsMessage : public ProcessMessage
{
public:
uint8 cpu_; //!< Process cpu as proportion of max
uint8 mem_; //!< Mem usage as proportion of max
ProcessStatsMessage() : ProcessMessage(), cpu_( 0 ), mem_( 0 )
{
message_ = MachineGuardMessage::PROCESS_STATS_MESSAGE;
}
virtual ~ProcessStatsMessage() {}
virtual const char *c_str() const;
protected:
virtual void writeImpl( BinaryOStream &os );
virtual void readImpl( BinaryIStream &is );
};
struct ProcessInfo
{
ProcessInfo() { starttime = 0; }
HighResStat cpu, mem;
int affinity;
ProcessStatsMessage m;
// Time (since OS boot) that the process was started
unsigned long int starttime;
// Platform specific implementation
void init( const ProcessMessage &pm );
};
BW::vector< ProcessInfo > procs_;
当接收到findInterface对应的查询请求PROCESS_STATS_MESSAGE的时候,只需要遍历procs_数组,找到所有符合条件的进程,然后将其信息打包成一个ProcessStatsMessage发送回查询请求者:
// bool BWMachined::handleMessage( Endpoint & ep, sockaddr_in & sin,
// MachineGuardMessage & mgm, MGMPacket & replies )
case MachineGuardMessage::PROCESS_STATS_MESSAGE:
{
ProcessStatsMessage &query = static_cast< ProcessStatsMessage& >( mgm );
// Find the processes this matches
bool found = false;
for (BW::vector< ProcessInfo >::iterator it = procs_.begin();
it != procs_.end(); ++it)
{
ProcessInfo &pi = *it;
if (pi.m.matches( query ))
{
// Update load and mem stats on the MGM for this process
SystemInfo &si = systemInfo_;
uint64 cpuDiff = std::max(
si.cpu[ pi.affinity % si.nCpus ].max.delta(),
(uint64)1 );
if (pi.affinity >= (int)si.nCpus)
syslog( LOG_ERR, "ProcessInfo (%s) has invalid affinity %d",
pi.m.c_str(), pi.affinity );
ProcessStatsMessage &reply = pi.m;
reply.cpu_ = (uint8)(pi.cpu.delta()*0xff / cpuDiff);
reply.mem_ = (uint8)(pi.mem.cur()*0xff / si.mem.max.cur());
// Add reply to the stream
reply.copySeq( query );
replies.append( reply );
found = true;
}
}
// If nothing found, send back a message with pid == 0 so that recv
// loops can terminate on client side
if (!found)
{
query.pid_ = 0;
query.outgoing( true );
replies.append( query );
}
return true;
}
当findInterface返回的时候, MgrAppGateway就可以根据结果来正确的获取目标MgrApp的地址信息,并根据地址信息来创建到MgrApp的channel,后续可以通过这个channel来发送注册消息。例如BaseAppMgrGateway上就暴露了一个add方法,来通知到对应的BaseAppMgr进程:
void BaseAppMgrGateway::add( const Mercury::Address & addrForCells,
const Mercury::Address & addrForClients, bool isServiceApp,
Mercury::ReplyMessageHandler * pHandler )
{
Mercury::Bundle & bundle = channel_.bundle();
BaseAppMgrInterface::addArgs * args =
(BaseAppMgrInterface::addArgs*)bundle.startStructRequest(
BaseAppMgrInterface::add, pHandler );
args->addrForCells = addrForCells;
args->addrForClients = addrForClients;
args->isServiceApp = isServiceApp;
channel_.send();
}
这个add的调用发生在BaseApp::init函数的最末尾:
/**
* This method initialises this object.
*
* @param argc The number of elements in argv.
* @param argv An array of argument strings.
*/
bool BaseApp::init( int argc, char * argv[] )
{
if (!this->EntityApp::init( argc, argv ))
{
return false;
}
// 省略很多逻辑
// Add ourselves to the BaseAppMgr. Init will continue once the reply to
// this message is received. This object deletes itself.
new AddToBaseAppMgrHelper( *this );
return true;
}
/**
* This class helps to add this app to the BaseAppMgr.
*/
class AddToBaseAppMgrHelper : public AddToManagerHelper
{
public:
AddToBaseAppMgrHelper( BaseApp & baseApp ) :
AddToManagerHelper( baseApp.mainDispatcher() ),
app_( baseApp )
{
// Auto-send on construction.
this->send();
}
void handleFatalTimeout()
{
ERROR_MSG( "AddToBaseAppMgrHelper::handleFatalTimeout: Unable to add "
"%s to BaseAppMgr. Terminating.\n", app_.getAppName());
app_.shutDown();
}
void doSend()
{
app_.baseAppMgr().add( app_.intInterface().address(),
app_.extInterface().address(), app_.isServiceApp(), this );
}
bool finishInit( BinaryIStream & data )
{
// Call send now so that any pending ACKs are sent now. This gives
// finishInit a little longer before resends occur. Should not rely
// on this as BaseAppMgr already thinks this BaseApp is running
// normally.
// TODO:BAR
app_.baseAppMgr().send();
BaseAppInitData initData;
data >> initData;
return app_.finishInit( initData, data );
}
private:
BaseApp & app_;
};
由于BaseApp,CellApp,DbApp都有注册到对应的MgrApp的需求,所以这里还抽象出来一个基类AddToManagerHelper,来统一处理注册逻辑。这个基类的send方法会调用到子类实现的doSend方法,来发送注册消息,对于BaseApp而言就是调用BaseAppMgrGateway::add方法。只有对应的MgrApp响应了这个Add注册请求之后,对应的App角色的finishInit接口才会执行,此时这个App相当于切换到了可用状态,可以承接后续的业务处理了。一般来说所有的游戏帧的启动都是在这个finishInit里,下面就是CellApp::finishInit的逻辑:
/**
* This method handles the portion of init after registering with the
* CellAppMgr.
*/
bool CellApp::finishInit( const CellAppInitData & initData )
{
// Make sure that nothing else is read in the main thread.
BWResource::watchAccessFromCallingThread( true );
if (int32( initData.id ) == -1)
{
ERROR_MSG( "CellApp::finishInit: "
"CellAppMgr refused to let us join.\n" );
return false;
}
id_ = initData.id;
this->setStartTime( initData.time );
baseAppAddr_ = initData.baseAppAddr;
dbAppAlpha_.addr( initData.dbAppAlphaAddr );
isReadyToStart_ = initData.isReady;
// Attach ourselves to an ID server (in this case, the Alpha DBApp).
if (!idClient_.init( &this->dbAppAlpha(),
DBAppInterface::getIDs,
DBAppInterface::putIDs,
IDConfig::criticallyLowSize(),
IDConfig::lowSize(),
IDConfig::desiredSize(),
IDConfig::highSize() ))
{
ERROR_MSG( "CellApp::finishInit: Failed to get IDs\n" );
return false;
}
timeoutPeriod_ = initData.timeoutPeriod;
// Send app id to loggers
LoggerMessageForwarder::pInstance()->registerAppID( id_ );
if (isReadyToStart_)
{
this->startGameTime();
}
else
{
// Let startup() message handler start the game timer.
isReadyToStart_ = true;
}
CONFIG_INFO_MSG( "\tCellApp ID = %d\n", id_ );
CONFIG_INFO_MSG( "\tstarting time = %.1f\n",
this->gameTimeInSeconds() );
CellAppInterface::registerWithMachined( this->interface(), id_ );
// We can safely register a birth listener now since we have mapped the
// interfaces we are serving.
Mercury::MachineDaemon::registerBirthListener(
this->interface().address(),
CellAppInterface::handleCellAppMgrBirth, "CellAppMgrInterface" );
// init the watcher stuff
char abrv[32];
bw_snprintf( abrv, sizeof(abrv), "cellapp%02d", id_ );
BW_REGISTER_WATCHER( id_, abrv, "cellApp",
mainDispatcher_, this->interface().address() );
int pythonPort = BWConfig::get( "cellApp/pythonPort",
PORT_PYTHON_CELLAPP + id_ );
this->startPythonServer( pythonPort, id_ );
INFO_MSG( "CellApp::finishInit: CellAppMgr acknowledged our existence.\n" );
return true;
}
在上面的startGameTime会开启游戏帧定时器:
/**
* This method is called when we are ready to start the game timer.
*/
void CellApp::startGameTime()
{
INFO_MSG( "CellApp is starting\n" );
MF_ASSERT( !gameTimer_.isSet() && (pTimeKeeper_ == NULL) );
MF_ASSERT( cellAppMgr_.isInitialised() );
// start the game timer
gameTimer_ = this->mainDispatcher().addTimer(
1000000/Config::updateHertz(), this,
reinterpret_cast< void * >( TIMEOUT_GAME_TICK ),
"GameTick" );
lastGameTickTime_ = timestamp();
gettimeofday( &oldTimeval_, NULL );
mainDispatcher_.clearSpareTime();
this->calcTransientLoadTime();
pTimeKeeper_ = new TimeKeeper( interface_, gameTimer_, time_,
Config::updateHertz(), cellAppMgr_.addr(),
&CellAppMgrInterface::gameTimeReading,
id_, Config::maxTickStagger() );
// Now we're sending load updates to the CellAppMgr regularly
cellAppMgr_.isRegular( true );
}
当gameTimer_成功创建之后,当前CellApp的所有前期准备工作就都做完了,游戏帧定时器也开始运行了,在CellApp的handleTimeout方法中会通过handleGameTickTimeSlice来间接调用之前介绍的ServerApp::advanceTime来驱动各个Tick相关的回调:
/**
* This method handles timeout events.
*/
void CellApp::handleTimeout( TimerHandle /*handle*/, void * arg )
{
switch (reinterpret_cast<uintptr>( arg ))
{
case TIMEOUT_GAME_TICK:
this->handleGameTickTimeSlice();
break;
case TIMEOUT_TRIM_HISTORIES:
this->handleTrimHistoriesTimeSlice();
break;
case TIMEOUT_LOADING_TICK:
{
// 省略一些代码
}
}
}
/**
* This method handles the game tick time slice.
*/
void CellApp::handleGameTickTimeSlice()
{
// 省略一些代码
this->updateBoundary();
// Increment the time - we are now into the quantum of the next tick
this->advanceTime();
// 省略一些代码
}
对于LoginApp来说,虽然没有对应的LoginAppMgr,但是由于其业务重度依赖于数据库,所以在完成基础的初始化init之后,需要连接到DBApMgr来完成必要的初始化工作,:
bool LoginApp::init( int argc, char * argv[] ) /* override */
{
// 省略一些代码
// This calls back on finishInit().
new AddToDBAppMgrHelper( *this );
// 省略一些代码
}
在这个注册函数里会发送一个DBAppMgrInterface::addLoginApp的请求到dbAppMgr,在注册完成之后才会调用到LoginApp::finishInit来开启游戏帧Tick计时器:
class AddToDBAppMgrHelper : public AddToManagerHelper
{
public:
/**
* Constructor.
*
* @param loginApp The LoginApp instance.
*/
AddToDBAppMgrHelper( LoginApp & loginApp ) :
AddToManagerHelper( loginApp.mainDispatcher() ),
app_( loginApp )
{
// Auto-send on construction.
this->send();
}
/* Override from AddToManagerHelper. */
void handleFatalTimeout() /* override */
{
ERROR_MSG( "AddToDBAppMgrHelper::handleFatalTimeout: Unable to add "
"LoginApp to DBAppMgr (%s). Terminating.\n",
app_.dbAppMgr().addr().c_str() );
app_.mainDispatcher().breakProcessing();
}
/* Override from AddToDBAppHelper. */
void doSend() /* override */
{
Mercury::Bundle & bundle = app_.dbAppMgr().bundle();
bundle.startRequest( DBAppMgrInterface::addLoginApp, this );
app_.dbAppMgr().send();
}
/* Override from AddToDBAppHelper. */
bool finishInit( BinaryIStream & data ) /* override */
{
LoginAppID appID;
Mercury::Address dbAppAlphaAddr;
data >> appID >> dbAppAlphaAddr;
return app_.finishInit( appID, dbAppAlphaAddr );
}
private:
LoginApp & app_;
};
这里的doSend会往LoginApp::dbAppMgr发送消息,但是发送消息的前提是知道发送目标的ip:port,这个dbAppMgr的地址获取是在LoginApp::init中完成的:
bool LoginApp::init( int argc, char * argv[] ) /* override */
{
// 省略一些代码
int numStartupRetries = Config::numStartupRetries();
if (!BW_INIT_ANONYMOUS_CHANNEL_CLIENT( dbAppMgr_, this->intInterface(),
LoginIntInterface, DBAppMgrInterface, numStartupRetries ))
{
ERROR_MSG( "LoginApp::init: Could not find DBAppMgr\n" );
return false;
}
// 省略一些代码
}
这个宏展开之后会调用下面的AnonymousChannelClient::init方法,内部会调用MachineDaemon::findInterface来查找DBAppMgr的地址:
/**
* This method initialises this object.
*
* @return true on success, otherwise false.
*/
bool AnonymousChannelClient::init( Mercury::NetworkInterface & interface,
Mercury::InterfaceMinder & interfaceMinder,
const Mercury::InterfaceElement & birthMessage,
const char * componentName,
int numRetries )
{
interfaceName_ = componentName;
bool result = true;
interfaceMinder.handler( birthMessage.id(), this );
if (Mercury::MachineDaemon::registerBirthListener(
interface.address(), birthMessage, componentName ) !=
Mercury::REASON_SUCCESS)
{
NETWORK_ERROR_MSG( "AnonymousChannelClient::init: "
"Failed to register birth listener for %s\n",
componentName );
result = false;
}
Mercury::Address serverAddr( Mercury::Address::NONE );
if (Mercury::MachineDaemon::findInterface(
componentName, 0, serverAddr, numRetries ) !=
Mercury::REASON_SUCCESS)
{
result = false;
}
// Everyone talking to another process via this mechanism is doing it
// irregularly at the moment. Could make this optional.
pChannelOwner_ = new Mercury::ChannelOwner( interface, serverAddr );
pChannelOwner_->channel().isLocalRegular( false );
pChannelOwner_->channel().isRemoteRegular( false );
return result;
}
如果LoginApp启动的时候对应的DBAppMgr进程还没有启动,上面注册的BirthListener就会起作用,因为其会监听广播出来的DBAppMgrInterfaceBirth消息。
/**
* This method is used to register a birth or death listener with machined.
*/
Reason registerListener( const Address & srcAddr,
UDPBundle & bundle, int addrStart,
const char * ifname, bool isBirth, bool anyUID = false )
{
// finalise the bundle first
bundle.finalise();
const Packet * p = bundle.pFirstPacket();
MF_ASSERT( p->flags() == 0 );
// prepare the message for machined
ListenerMessage lm;
lm.param_ = (isBirth ? lm.ADD_BIRTH_LISTENER : lm.ADD_DEATH_LISTENER) |
lm.PARAM_IS_MSGTYPE;
lm.category_ = lm.SERVER_COMPONENT;
lm.uid_ = anyUID ? lm.ANY_UID : getUserId();
lm.pid_ = mf_getpid();
lm.port_ = srcAddr.port;
lm.name_ = ifname;
const int addrLen = 6;
unsigned int postSize = p->totalSize() - addrStart - addrLen;
lm.preAddr_ = BW::string( p->data(), addrStart );
lm.postAddr_ = BW::string( p->data() + addrStart + addrLen, postSize );
const uint32 destAddr = LOCALHOST;
return lm.sendAndRecv( srcAddr.ip, destAddr, NULL );
}
/**
* This method registers a callback with machined to be called when a certain
* type of process is started.
*
* @note This needs to be fixed up if rebind is called on this nub.
*/
Reason registerBirthListener( const Address & srcAddr,
UDPBundle & bundle, int addrStart, const char * ifname )
{
return registerListener( srcAddr, bundle, addrStart, ifname, true );
}
这个ListenerMessage的发送目标还是LOCALHOST,当LOCALHOST上的BWMachineD接收到这个消息的时候,会先存储这个Listener到内部的birthListeners_数组里:
bool BWMachined::handleMessage( Endpoint & ep, sockaddr_in & sin,
MachineGuardMessage & mgm, MGMPacket & replies )
{
// If the message we received is not known, don't attempt to process
// any further. It is possible the data received is a valid structure
// that has had its tail corrupted.
if (mgm.flags_ & mgm.MESSAGE_NOT_UNDERSTOOD)
{
syslog( LOG_ERR, "Received unknown message: %s", mgm.c_str() );
mgm.outgoing( true );
replies.append( mgm );
return true;
}
switch (mgm.message_)
{
case MachineGuardMessage::LISTENER_MESSAGE:
{
ListenerMessage &lm = static_cast< ListenerMessage& >( mgm );
if (lm.param_ == (lm.ADD_BIRTH_LISTENER | lm.PARAM_IS_MSGTYPE))
{
birthListeners_.add( lm, sin.sin_addr.s_addr );
}
else if (lm.param_ == (lm.ADD_DEATH_LISTENER | lm.PARAM_IS_MSGTYPE))
{
deathListeners_.add( lm, sin.sin_addr.s_addr );
}
else
{
syslog( LOG_ERR, "Unrecognised param field for ListenerMessage: %x",
lm.param_ );
return false;
}
// Ack to sender
lm.outgoing( true );
replies.append( lm );
return true;
}
// 省略其他代码
}
}
但是DBAppMgr可能是在其他机器上启动的,所以肯定有某种机制在DBAppMgr启动之后,将其ip:port广播出来,为了跟踪这个广播过程,我们需要查看使用这个BirthListener的地方,依然在BWMachined::handleMessage方法中,在处理ProcessMessage::NOTIFY_BIRTH消息的分支里:
case MachineGuardMessage::PROCESS_MESSAGE:
{
ProcessMessage &pm = static_cast< ProcessMessage& >( mgm );
// Explicit instances of this class shouldn't be using param filters
if (!(pm.param_ & pm.PARAM_IS_MSGTYPE))
{
syslog( LOG_ERR, "PROCESS_MESSAGE tried to use param filters! (%x)",
pm.param_ );
return false;
}
int msgtype = pm.param_ & ~pm.PARAM_IS_MSGTYPE;
// Don't allow other machines to register/deregister their processes
// on this machine.
if ((msgtype == pm.REGISTER || msgtype == pm.DEREGISTER) &&
(uint32&)sin.sin_addr != cluster_.ownAddr_ &&
(uint32&)sin.sin_addr != LOCALHOST)
{
syslog( LOG_ERR, "%s tried to register a process here!",
inet_ntoa( sin.sin_addr ) );
return false;
}
switch (msgtype)
{
case ProcessMessage::REGISTER:
{
// 省略相关代码
}
case ProcessMessage::DEREGISTER:
{
// 省略相关代码
}
case ProcessMessage::NOTIFY_BIRTH:
{
birthListeners_.handleNotify( this->endpoint(), pm, sin.sin_addr );
return true;
}
case ProcessMessage::NOTIFY_DEATH:
{
deathListeners_.handleNotify( this->endpoint(), pm, sin.sin_addr );
return true;
}
default:
syslog( LOG_ERR, "Unrecognised ProcessMessage type: %d", msgtype );
return false;
}
}
这里接收到ProcessMessage::NOTIFY_BIRTH类型的消息的时候会调用Listeners::handleNotify方法,这个方法会遍历内部的birthListeners_数组,将消息发送给所有注册的BirthListener:
void Listeners::handleNotify( const Endpoint & endpoint,
const ProcessMessage & pm, in_addr addr )
{
char address[6];
memcpy( address, &addr, sizeof( addr ) );
memcpy( address + sizeof( addr ), &pm.port_, sizeof( pm.port_ ) );
Members::iterator iter = members_.begin();
while (iter != members_.end())
{
ListenerMessage &lm = iter->lm_;
if (lm.category_ == pm.category_ &&
(lm.uid_ == lm.ANY_UID || lm.uid_ == pm.uid_) &&
(lm.name_ == pm.name_ || lm.name_.size() == 0))
{
int msglen = lm.preAddr_.size() + sizeof( address ) +
lm.postAddr_.size();
char *data = new char[ msglen ];
int preSize = lm.preAddr_.size();
int postSize = lm.postAddr_.size();
memcpy( data, lm.preAddr_.c_str(), preSize );
memcpy( data + preSize, address, sizeof( address ) );
memcpy( data + preSize + sizeof( address ), lm.postAddr_.c_str(),
postSize );
// and send to the appropriate port locally
endpoint.sendto( data, msglen, lm.port_, iter->addr_ );
delete [] data;
}
++iter;
}
}
因此需要继续跟踪一下这个ProcessMessage::NOTIFY_BIRTH消息又是在哪里发出来的,还是在上面的MachineGuardMessage::PROCESS_MESSAGE分支里,不过是在处理ProcessMessage::REGISTER消息的时候,这里会使用broadcastToListeners执行NOTIFY_BIRTH消息的广播,这个broadcastToListeners的发送目标是广播地址BROADCAST,因此任何本地局域网里的机器上的BWMachineD都可以接收到这个NOTIFY_BIRTH的消息:
switch (msgtype)
{
case ProcessMessage::REGISTER:
{
// 省略一些代码
// platform-specific initialisation
pi.init( pm );
// Tell listeners about it
broadcastToListeners( pm, pm.NOTIFY_BIRTH );
// and confirm the registration to the sender
pm.outgoing( true );
replies.append( pm );
syslog( LOG_INFO, "Added %s for uid:%d (debug: at index %d)\n",
pm.c_str(), pm.uid_, i );
return true;
}
// 省略一些代码
}
/**
* Inform other machined's on the network about the birth or death of a process
* on this machine so that they can in turn inform their registered listeners
* about it.
*/
bool BWMachined::broadcastToListeners( ProcessMessage &pm, int type )
{
uint8 oldparam = pm.param_;
pm.param_ = type | pm.PARAM_IS_MSGTYPE;
bool ok = pm.sendto( ep_, htons( PORT_MACHINED ), BROADCAST,
MGMPacket::PACKET_STAGGER_REPLIES );
pm.param_ = oldparam;
return ok;
}
跟踪完整个BirthListener的调用链,可以知道当DBAppMgr进程启动之后,会向本地的BWMachineD发送ProcessMessage::REGISTER消息,本地的BWMachineD就会以ProcessMessage::NOTIFY_BIRTH类型的消息广播给所有局域网机器上注册的BirthListener,包括LoginApp,因此LoginApp就会收到BirthListener的回调,从而完成DBAppMgr的地址获取:
/**
* This method handles a birth message telling us that a new server has
* started.
*/
void AnonymousChannelClient::handleMessage(
const Mercury::Address & srcAddr,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data )
{
Mercury::Address serverAddr;
data >> serverAddr;
MF_ASSERT( data.remainingLength() == 0 && !data.error() );
pChannelOwner_->addr( serverAddr );
PROC_IP_INFO_MSG( "AnonymousChannelClient::handleMessage: "
"Got new %s at %s\n",
interfaceName_.c_str(), pChannelOwner_->channel().c_str() );
}
退出流程
基于信号的退出
Bigworld里的进程都注册了强制退出信号处理函数,目前处理的退出信号是SIGINT,在ServerApp的初始化函数init里会调用enableSignalHandler方法注册这个SIGINT信号处理函数到自身的pSignalHandler_:
bool ServerApp::init( int argc, char * argv[] )
{
// 省略一些代码
pSignalHandler_.reset( this->createSignalHandler() );
// Handle signals
this->enableSignalHandler( SIGINT );
return true;
}
SignalHandler * ServerApp::createSignalHandler()
{
return new ServerAppSignalHandler( *this );
}
/**
* Enables or disables the handling of a given signal by the ServerApp
* instance's signal handler.
*/
void ServerApp::enableSignalHandler( int sigNum, bool enable )
{
if (pSignalHandler_.get() == NULL)
{
ERROR_MSG( "ServerApp::enableSignalHandler: no signal handler set\n" );
return;
}
if (enable)
{
SignalProcessor::instance().addSignalHandler( sigNum,
pSignalHandler_.get() );
}
else
{
SignalProcessor::instance().clearSignalHandler( sigNum,
pSignalHandler_.get() );
}
}
这个pSignalHandler_负责将信号处理函数转发到ServerApp::onSignalled方法,这里会利用signal( sigNum, SIG_DFL )重置为默认的信号处理函数,来避免Python脚本那边设置了这个信号的处理,然后使用kill( getpid(), sigNum )来重新发送这个信号,从而导致进程的强制退出:
class ServerAppSignalHandler : public SignalHandler
{
public:
ServerAppSignalHandler( ServerApp & serverApp ):
serverApp_( serverApp )
{}
virtual ~ServerAppSignalHandler()
{}
virtual void handleSignal( int sigNum )
{
serverApp_.onSignalled( sigNum );
}
private:
ServerApp & serverApp_;
};
/**
* Default signal handling.
*/
void ServerApp::onSignalled( int sigNum )
{
switch (sigNum)
{
case SIGINT:
// This is really just to make sure that Python does not install its
// own SIGINT handler.
// It would probably be better to just revert to SIG_DFL straight after
// Py_Initialize in Script::init().
signal( sigNum, SIG_DFL );
kill( getpid(), sigNum );
break;
default:
break;
}
}
基于shutdown的退出
上述的强制kill会导致进程立即退出,缺少很多资源的释放与清理操作,所以一般不在迫不得已的情况下不会去使用这个退出信号。在ServerApp上提供了一个更加优雅的退出接口ServerApp::shutDown,ServerApp::shutDown方法里会调用mainDispatcher_.breakProcessing方法来通知事件循环处理processContinuously可以结束了:
void ServerApp::shutDown()
{
INFO_MSG( "ServerApp::shutDown: shutting down\n" );
mainDispatcher_.breakProcessing();
}
/**
* This method breaks out of 'processContinuously' at the next opportunity.
* Any messages in bundles that are being processed or timers that have
* expired will still get called. Note: if this is called from another
* thread it will NOT interrupt a select call if one is in progress, so
* processContinuously will not return. Try sending the process a (handled)
* signal if this is your intention.
*/
INLINE void EventDispatcher::breakProcessing( bool breakState )
{
breakProcessing_ = breakState;
}
void EventDispatcher::processContinuously()
{
breakProcessing_ = false;
while (!breakProcessing_)
{
this->processOnce( /* shouldIdle */ true );
}
}
当这个processContinuously方法返回时,ServerApp就会执行onRunComplete方法,开始执行清理操作:
/**
* This is the default implementation of run. Derived classes to override
* this to implement their own functionality.
*/
bool ServerApp::run()
{
mainDispatcher_.processUntilBreak();
this->onRunComplete();
return true;
}
这个onRunComplete是一个虚函数,目前在ServerApp里的实现是空,在子类里会根据需要去重载:
/*
* Override from ServerApp.
*/
void LoginApp::onRunComplete() /* override */
{
INFO_MSG( "LoginApp::run: Terminating normally.\n" );
this->ServerApp::onRunComplete();
bool sent = false;
if (this->isDBAppMgrReady())
{
Mercury::Bundle & dbMgrBundle = dbAppMgr_.pChannelOwner()->bundle();
DBAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_REQUEST;
dbMgrBundle << args;
dbAppMgr_.pChannelOwner()->send();
sent = true;
}
if (sent)
{
this->intInterface().processUntilChannelsEmpty();
}
}
/**
* This method is run when the runloop has finished.
*/
void DBApp::onRunComplete()
{
this->ScriptApp::onRunComplete();
this->finalise();
}
在ServerApp::run返回之后,会接下来执行fini方法来清理,并在fini结束之后调用interface_.prepareForShutdown方法来准备网络接口的关闭:
bool ServerApp::runApp( int argc, char * argv[] )
{
// calculate the clock speed
stampsPerSecond();
bool result = false;
if (this->init( argc, argv ))
{
INFO_MSG( "---- %s is running ----\n", this->getAppName() );
result = this->run();
}
else
{
ERROR_MSG( "Failed to initialise %s\n", this->getAppName() );
}
this->fini();
interface_.prepareForShutdown();
#if ENABLE_PROFILER
g_profiler.fini();
#endif
return result;
}
这个fini方法也是一个声明在ServerApp里的空的虚函数,在子类里会根据需要去重载,用来执行一些清理操作。不过目前好像没有在哪个ServerApp的子类里看到重载这个方法的情况。
至于interface_.prepareForShutdown方法,主要任务是用来调用mainDispatcher.prepareforshutdown,然后通过finaliseRequestManager方法来清理请求管理器,这样就不再处理任何网络请求:
/**
* This method prepares this NetworkInterface for being shut down.
*/
void NetworkInterface::prepareForShutdown()
{
if (pMainDispatcher_)
{
pMainDispatcher_->prepareForShutdown();
}
this->finaliseRequestManager();
}
/**
* This method destroys the RequestManager. It should be called when shutting
* down to ensure that no ReplyMessageHandler instances are cancelled when the
* server is in a bad state (e.g. after the App has been destroyed.)
*/
void NetworkInterface::finaliseRequestManager()
{
this->interfaceTable().serve( InterfaceElement::REPLY, NULL );
RequestManager * pRequestManager = pRequestManager_;
pRequestManager_ = NULL;
bw_safe_delete( pRequestManager );
}
在EventDispatcher::prepareForShutdown方法中,会重新使用循环来调用processOnce方法,处理退出阶段的所有逻辑,这里设置了处理的最大超时时间为2s,避免业务逻辑出错引发退出失败:
void EventDispatcher::prepareForShutdown()
{
const uint64 SECONDS_TO_ATTEMPT_SEND = 2;
const uint64 startTime = timestamp();
uint64 timePeriod = (stampsPerSecond() * SECONDS_TO_ATTEMPT_SEND);
while (!pPoller_->isReadyForShutdown() &&
(timestamp() - startTime < timePeriod))
{
this->processOnce();
// Wait 100ms
#if defined( PLAYSTATION3 )
sys_timer_usleep( 100000 );
#elif !defined( _WIN32 )
usleep( 100000 );
#else
Sleep( 100 );
#endif
}
}
当这些流程都走完之后,就开始调用各个ServerApp的析构函数,虽然这个基类上的析构函数是一个空实现,各个子类里的重载内容就十分丰富了,简单点的就是LoginApp,在析构函数里关闭对外开放的网络端口extInterface:
/**
* Destructor.
*/
LoginApp::~LoginApp()
{
this->extInterface().prepareForShutdown();
statsTimer_.cancel();
tickTimer_.cancel();
}
在DBApp的析构函数里会执行数据库的断开操作:
/**
* Destructor.
*/
DBApp::~DBApp()
{
interface_.cancelRequestsFor( &baseAppMgr_.channel() );
statusCheckTimer_.cancel();
gameTimer_.cancel();
bw_safe_delete( pBillingSystem_ );
pBillingSystem_ = NULL;
bw_safe_delete( pDatabase_ );
// Destroy entity descriptions before calling Script::fini() so that it
// can clean up any PyObjects that it may have.
bw_safe_delete( pEntityDefs_ );
DataType::clearStaticsForReload();
// Now done in ~ScriptApp
// Script::fini();
}
这样就完成了有序退出shutDown流程。
至于这个shutDown接口什么时候被调用,则根据具体的ServerApp子类来管理,最简单的就是将这个接口暴露给Python,让业务逻辑来决定什么时候当前进程应该退出,例如在CellApp和BaseApp里就提供了shutDownApp方法给Python调用:
// server\baseapp\script_bigworld.cpp
/*~ function BigWorld.shutDownApp
* @components{ base }
*
* This method induce a shutdown of this BaseApp.
*/
void shutDownApp()
{
BaseApp::instance().shutDown();
}
PY_AUTO_MODULE_FUNCTION( RETVOID, shutDownApp, END, BigWorld )
// server\cellapp\cellapp.cpp
/*~ function BigWorld.shutDownApp
* @components{ cell }
*
* This method induce a controlled shutdown of this CellApp.
*/
void shutDownApp()
{
CellApp::instance().shutDown();
}
PY_AUTO_MODULE_FUNCTION( RETVOID, shutDownApp, END, BigWorld )
基于retire的退出
如果要一个ServerApp执行安全退出,上述的shutDownApp是一个非常方便的方法。此外还有一种方法可以触发ServerApp::shutDown方法来执行安全的退出逻辑,这个方法是通过BWMachineD发送的retireApp指令。每个ServerApp在启动时都会通过ServerApp::addWatchers注册一个command/retireXXX指令的监听,这里的XXX是当前进程的在集群里的名字AppName:当BWMachineD发送retireApp指令时,ServerApp会收到这个指令,然后执行安全退出逻辑。
/**
* This method adds the watchers associated with this class.
*/
void ServerApp::addWatchers( Watcher & watcher )
{
watcher.addChild( "nub",
Mercury::NetworkInterface::pWatcher(), &interface_ );
if (this->pManagerAppGateway())
{
char buf[ 256 ];
snprintf( buf, sizeof( buf ), "command/retire%s",
this->getAppName() );
watcher.addChild( buf, new RetireAppCommand( *this ) );
}
}
class RetireAppCommand : public NoArgCallableWatcher
{
public:
RetireAppCommand( ServerApp & app ) :
// TODO: BWT-29273 Enable DBApp watcher forwarding
NoArgCallableWatcher( strcmp( app.getAppName(), "DBApp" ) == 0 ?
CallableWatcher::LOCAL_ONLY : CallableWatcher::LEAST_LOADED,
"Retire the least loaded app." ),
app_( app )
{
}
private:
virtual bool onCall( BW::string & output, BW::string & value )
{
INFO_MSG( "Requesting to retire this app.\n" );
app_.requestRetirement();
return true;
}
ServerApp & app_;
};
当ServerApp接收到了command/retireXXX指令后,会调用ServerApp::requestRetirement方法来请求退休。在ServerApp::requestRetirement方法里,会判断当前进程是否有ManagerAppGateway,如果有,就会调用ManagerAppGateway::retireApp方法来发送退休请求。
/**
* This method requests that this application should be retired.
*/
void ServerApp::requestRetirement()
{
if (!this->pManagerAppGateway())
{
ERROR_MSG( "ServerApp::requestRetirement: "
"%s has no manager app gateway configured\n",
this->getAppName() );
return;
}
this->pManagerAppGateway()->retireApp();
}
void ManagerAppGateway::retireApp()
{
Mercury::Bundle & bundle = channel_.bundle();
bundle.startMessage( retireAppIE_ );
channel_.send();
}
这个retireAppIE_是在ManagerAppGateway初始化的时候就设置好了的,CellAppMgrGateway、BaseAppMgrGateway和DBAppMgrGateway都是ManagerAppGateway的子类,在构造函数里就初始化好了retireAppIE_。
ManagerAppGateway::ManagerAppGateway( Mercury::NetworkInterface & networkInterface,
const Mercury::InterfaceElement & retireAppIE ) :
channel_( networkInterface, Mercury::Address::NONE ),
retireAppIE_( retireAppIE )
{
MF_ASSERT( retireAppIE_.lengthStyle() == Mercury::FIXED_LENGTH_MESSAGE );
MF_ASSERT( retireAppIE_.lengthParam() == 0 );
}
/**
* The constructor for CellAppMgrGateway.
*/
CellAppMgrGateway::CellAppMgrGateway( Mercury::NetworkInterface & interface ) :
ManagerAppGateway( interface, CellAppMgrInterface::retireApp )
{}
BaseAppMgrGateway::BaseAppMgrGateway( Mercury::NetworkInterface & interface ) :
ManagerAppGateway( interface, BaseAppMgrInterface::retireApp )
{
}
/**
* The constructor for DBAppMgrGateway.
*/
DBAppMgrGateway::DBAppMgrGateway( Mercury::NetworkInterface & interface ) :
ManagerAppGateway( interface, DBAppMgrInterface::retireApp )
{}
这三个AppMgr处理retireApp指令的方式基本一样,这里只分析CellAppMgr里对应的处理逻辑,这个retireApp的指令会发送到CellAppMgr里创建的对应的CellApp对象:
/**
* This method is called by the cell application when it wants to be shut down.
*/
void CellApp::retireApp()
{
INFO_MSG( "CellApp::retireApp: CellApp %u is requesting to retire.\n",
this->id() );
this->startRetiring();
}
/**
* This method starts the process of shutting down the application
* associated with this object.
*/
void CellApp::startRetiring()
{
INFO_MSG( "CellApp::startRetiring: Retiring CellApp %u - %s\n",
id_, this->addr().c_str() );
isRetiring_ = true;
if (!cells_.empty())
{
this->retireAllCells();
}
else
{
this->shutDown();
}
}
/**
* This method shuts down the cell application associated with this object.
*/
void CellApp::shutDown()
{
CellAppInterface::shutDownArgs args;
args.isSigInt = false; // Not used.
this->bundle() << args;
this->send();
}
CellApp接收到这个retireApp指令后,会调用CellApp::startRetiring方法来启动退休过程。在CellApp::startRetiring方法里,会判断当前CellApp是否管理着cells(游戏区域),如果有,就会调用CellApp::retireAllCells方法来迁移cells到其他CellApp,如果没有,就会调用CellApp::shutDown方法来退出进程。
当CellApp管理的cells都迁移到其他CellApp后,如果发现当前已经设置好了retire标记位,则也会调用CellApp::shutDown方法来发送RPC通知对应的CellApp进程去退出:
/**
* This method starts retiring all cells from this app.
*/
void CellApp::retireAllCells()
{
cells_.retireAll();
}
/**
* This method erases the input cell from this application.
*/
void CellApp::eraseCell( CellData * pCellData )
{
cells_.erase( pCellData );
if (isRetiring_ && cells_.empty())
{
this->shutDown();
}
}
当CellApp进程收到shutDown指令后,会调用CellApp::shutDown方法来退出安全退出:
/**
* This method handles a shutDown message.
*/
void CellApp::shutDown( const CellAppInterface::shutDownArgs & /*args*/ )
{
TRACE_MSG( "CellApp::shutDown: Told to shut down\n" );
this->shutDown();
}
集群的有序退出
上面暴露的shutDownApp方法只会触发当前进程的退出,如果要当前服务器集群里的所有进程都有序退出,一个一个的去通知shutDownApp不太灵活,因此在CellApp和BaseApp这两个能绑定Python的ServerApp子类里,提供了一个controlledShutDown方法,用来触发有序退出流程:
// server\baseapp\script_bigworld.cpp
/*~ function BigWorld.controlledShutDown
* @components{ base }
*
* This method induces a controlled shutdown of the cluster.
*/
void controlledShutDown()
{
BaseApp::instance().triggerControlledShutDown();
}
PY_AUTO_MODULE_FUNCTION( RETVOID, controlledShutDown, END, BigWorld )
// server\cellapp\cellapp.cpp
/*~ function BigWorld.controlledShutDown
* @components{ cell }
*
* This method induces a controlled shutdown of the cluster.
*/
void controlledShutDown()
{
CellApp::instance().triggerControlledShutDown();
}
PY_AUTO_MODULE_FUNCTION( RETVOID, controlledShutDown, END, BigWorld )
这两个不同的ServerApp执行controlledShutDown方法的后续流程大同小异。这里先分析一下CellApp的triggerControlledShutDown方法:
/**
* This method triggers a controlled shutdown of the cluster.
*/
void CellApp::triggerControlledShutDown()
{
CellAppMgrGateway & cellAppMgr = this->cellAppMgr();
Mercury::Bundle & bundle = cellAppMgr.bundle();
CellAppMgrInterface::controlledShutDownArgs &args =
CellAppMgrInterface::controlledShutDownArgs::start( bundle );
args.stage = SHUTDOWN_TRIGGER;
cellAppMgr.send();
}
这个函数会向CellAppMgr发送一个controlledShutDown消息,携带的stage参数为SHUTDOWN_TRIGGER,CellAppMgr收到这个消息之后,会执行自己的triggerControlledShutDown方法:
/**
* This method handles a message to start shutting down in a controlled way.
*/
void CellAppMgr::controlledShutDown(
const CellAppMgrInterface::controlledShutDownArgs & args )
{
INFO_MSG( "CellAppMgr::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( args.stage ) );
switch (args.stage)
{
case SHUTDOWN_TRIGGER:
this->triggerControlledShutDown();
break;
s
case SHUTDOWN_REQUEST:
this->controlledShutDown();
break;
default:
ERROR_MSG( "CellAppMgr::controlledShutDown: "
"Stage %s not handled.\n",
ServerApp::shutDownStageToString( args.stage ) );
break;
}
}
这个CellAppMgr::triggerControlledShutDown负责将SHUTDOWN_TRIGGER消息发送给BaseAppMgr:
/**
* This method triggers a controlled shutdown of the cluster.
*/
void CellAppMgr::triggerControlledShutDown()
{
Mercury::Bundle & bundle = baseAppMgr_.bundle();
BaseAppMgrInterface::controlledShutDownArgs &args =
BaseAppMgrInterface::controlledShutDownArgs::start( bundle );
args.stage = SHUTDOWN_TRIGGER;
args.shutDownTime = 0; // unused on receiving side
baseAppMgr_.send();
}
BaseAppMgr收到这个消息之后,会执行自己的triggerControlledShutDown方法:
/**
* This method responds to a message telling us what stage of the controlled
* shutdown process the server is at.
*/
void BaseAppMgr::controlledShutDown(
const BaseAppMgrInterface::controlledShutDownArgs & args )
{
INFO_MSG( "BaseAppMgr::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( args.stage ) );
switch (args.stage)
{
// 省略一些代码
case SHUTDOWN_TRIGGER:
{
this->controlledShutDownServer();
break;
}
case SHUTDOWN_NONE:
case SHUTDOWN_DISCONNECT_PROXIES:
break;
}
}
类似的,如果是BaseApp发起的triggerControlledShutDown方法,会向BaseAppMgr发送一个controlledShutDown消息,携带的stage参数为SHUTDOWN_TRIGGER,此时也会调用到BaseAppMgr::controlledShutDownServer方法:
/**
* This method triggers a controlled shutdown of the cluster.
*/
void BaseApp::triggerControlledShutDown()
{
BaseAppMgrGateway & baseAppMgr = this->baseAppMgr();
Mercury::Bundle & bundle = baseAppMgr.bundle();
BaseAppMgrInterface::controlledShutDownArgs &args =
BaseAppMgrInterface::controlledShutDownArgs::start( bundle );
args.stage = SHUTDOWN_TRIGGER;
args.shutDownTime = 0; // unused on receiving side
baseAppMgr.send();
}
所以不管是CellApp还是BaseApp发起的triggerControlledShutDown方法,最终都会调用到BaseAppMgr::controlledShutDownServer方法。在BaseAppMgr::controlledShutDownServer里,会将controlledShutDown请求发送到DbAppMgr和LoginApp上,然后以SHUTDOWN_REQUEST消息的形式重新执行controlledShutDown方法:
/**
* Trigger a controlled shutdown of the entire server.
*/
void BaseAppMgr::controlledShutDownServer()
{
if (shutDownStage_ != SHUTDOWN_NONE)
{
DEBUG_MSG( "BaseAppMgr::controlledShutDownServer: "
"Already shutting down, ignoring duplicate shutdown request.\n" );
return;
}
// First try to trigger controlled shutdown via the loginapp
Mercury::Address loginAppAddr;
Mercury::Reason reason = Mercury::MachineDaemon::findInterface(
"LoginIntInterface", -1, loginAppAddr );
if (reason == Mercury::REASON_SUCCESS)
{
Mercury::ChannelSender sender( BaseAppMgr::getChannel( loginAppAddr ) );
Mercury::Bundle & bundle = sender.bundle();
bundle.startMessage( LoginIntInterface::controlledShutDown );
INFO_MSG( "BaseAppMgr::controlledShutDownServer: "
"Triggering server shutdown via LoginApp @ %s\n",
loginAppAddr.c_str() );
return;
}
else
{
ERROR_MSG( "BaseAppMgr::controlledShutDownServer: "
"Couldn't find a LoginApp to trigger server shutdown\n" );
}
// Next try to trigger shutdown via the DBApp.
// TODO: Scalable DB. Talk directly to DBAppMgr, instead of DBApp. DBApp
// will forward to DBAppMgr currently.
if (dbAppAlpha_.channel().isEstablished())
{
Mercury::Bundle & bundle = dbAppAlpha_.bundle();
DBAppInterface::controlledShutDownArgs::start( bundle ).stage =
SHUTDOWN_REQUEST;
dbAppAlpha_.send();
INFO_MSG( "BaseAppMgr::controlledShutDownServer: "
"Triggering server shutdown via the Alpha DBApp%02d (%s)\n",
dbApps_.alpha().id(),
dbAppAlpha_.addr().c_str() );
return;
}
else
{
ERROR_MSG( "BaseAppMgr::controlledShutDownServer: "
"Couldn't find the DBApp to trigger server shutdown\n" );
}
// Alright, the shutdown starts with me then
BaseAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_REQUEST;
INFO_MSG( "BaseAppMgr::controlledShutDownServer: "
"Starting controlled shutdown here (no LoginApp or DBApp found)\n" );
this->controlledShutDown( args );
}
在CellAppMgr::controlledShutDown里,遇到SHUTDOWN_REQUEST消息时,会将这个controlledShutDown请求发送到CellAppMgr上。当CellAppMgr接收到这个SHUTDOWN_REQUEST消息时,会执行自己的controlledShutDown方法,开启自身的退出流程:
/**
* This method handles a message to start shutting down in a controlled way.
*/
void CellAppMgr::controlledShutDown(
const CellAppMgrInterface::controlledShutDownArgs & args )
{
INFO_MSG( "CellAppMgr::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( args.stage ) );
switch (args.stage)
{
case SHUTDOWN_TRIGGER:
this->triggerControlledShutDown();
break;
case SHUTDOWN_REQUEST:
this->controlledShutDown();
break;
default:
ERROR_MSG( "CellAppMgr::controlledShutDown: "
"Stage %s not handled.\n",
ServerApp::shutDownStageToString( args.stage ) );
break;
}
}
/**
* This method initiates a controlled shutdown of the system.
*/
void CellAppMgr::controlledShutDown()
{
// Stop sending to anonymous channels etc.
interface_.stopPingingAnonymous();
ShutDownHandler::start( *this );
}
这里的ShutDownHandler::start的作用是通知当前CellAppMgr管理的所有CellApp和BaseAppMgr开始执行退出流程controlledShutDown,此时的stage为SHUTDOWN_INFORM:
/**
* This method starts the process of shutting down the server.
*/
void ShutDownHandler::start( CellAppMgr & mgr )
{
// No delay if we haven't started yet since game time won't go forward.
int shuttingDownDelay = 0;
if (mgr.hasStarted())
{
shuttingDownDelay = CellAppMgrConfig::shuttingDownDelayInTicks();
}
mgr.setShutDownHandler(
new InformHandler( mgr, mgr.time() + shuttingDownDelay ) );
}
InformHandler::InformHandler( CellAppMgr & mgr, GameTime shutDownTime ) :
mgr_( mgr ),
shutDownTime_( shutDownTime ),
baseAppsAcked_( false ),
ackedCellApps_()
{
// Inform the BaseAppMgr.
{
Mercury::Bundle & bundle = mgr_.baseAppMgr().bundle();
BaseAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_INFORM;
args.shutDownTime = shutDownTime_;
bundle << args;
mgr_.baseAppMgr().send();
}
// Inform all of the CellApps.
mgr_.cellApps().controlledShutDown( SHUTDOWN_INFORM, shutDownTime_ );
}
/**
*
*/
void CellApps::controlledShutDown( ShutDownStage stage, GameTime shutDownTime )
{
Map::iterator iter = map_.begin();
while (iter != map_.end())
{
iter->second->controlledShutDown( stage, shutDownTime );
++iter;
}
}
/**
* This method shuts down the cell application associated with this object in
* a controlled way.
*/
void CellApp::controlledShutDown( ShutDownStage stage, GameTime shutDownTime )
{
CellAppInterface::controlledShutDownArgs args;
args.stage = stage;
args.shutDownTime = shutDownTime;
this->bundle() << args;
this->send();
}
在BaseAppMgr::controlledShutDown方法中,当stage为SHUTDOWN_INFORM时,会通知当前管理的所有BaseApp开启退出流程:
/**
* This method responds to a message telling us what stage of the controlled
* shutdown process the server is at.
*/
void BaseAppMgr::controlledShutDown(
const BaseAppMgrInterface::controlledShutDownArgs & args )
{
INFO_MSG( "BaseAppMgr::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( args.stage ) );
switch (args.stage)
{
case SHUTDOWN_REQUEST:
{
if (cellAppMgr_.channel().isEstablished())
{
Mercury::Bundle & bundle = cellAppMgr_.bundle();
CellAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_REQUEST;
bundle << args;
cellAppMgr_.send();
}
break;
}
case SHUTDOWN_INFORM:
{
shutDownStage_ = args.stage;
shutDownTime_ = args.shutDownTime;
if (baseAndServiceApps_.empty())
{
this->ackBaseAppsShutDownToCellAppMgr( args.stage );
}
else
{
// Inform all base apps, once the requests are complete the
// CellAppMgr is notified via ackBaseAppsShutDownToCellAppMgr().
this->informBaseAppsOfShutDown( args );
}
break;
}
case SHUTDOWN_PERFORM:
{
this->startAsyncShutDownStage( SHUTDOWN_DISCONNECT_PROXIES );
break;
}
case SHUTDOWN_TRIGGER:
{
this->controlledShutDownServer();
break;
}
case SHUTDOWN_NONE:
case SHUTDOWN_DISCONNECT_PROXIES:
break;
}
}
/**
* This method informs base and service apps of the controlled shutdown process.
* @param args Shutdown stage and time.
*/
void BaseAppMgr::informBaseAppsOfShutDown(
const BaseAppMgrInterface::controlledShutDownArgs & args )
{
MF_ASSERT( !baseAndServiceApps_.empty() );
SyncControlledShutDownHandler * pHandler = new SyncControlledShutDownHandler(
args.stage, baseAndServiceApps_.size() );
MemoryOStream payload;
payload << args.stage << args.shutDownTime;
sendToBaseApps( BaseAppIntInterface::controlledShutDown, payload,
BaseAppsIterator( baseAndServiceApps_ ), pHandler );
}
此时BaseAppMgr和CellAppMgr的shutDownStage_都被设置为SHUTDOWN_INFORM,同时所有的CellApp和BaseApp都接收到了controlledShutDown方法,且stage为SHUTDOWN_INFORM。
对应的处理逻辑都类似,记录当前的shutDownTime_和shutDownReplyID_,并触发onAppShuttingDown事件:
/**
* This method is called by the BaseAppMgr to tell us to shut down.
*/
void BaseApp::controlledShutDown( const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
BinaryIStream & data )
{
ShutDownStage stage;
GameTime shutDownTime;
data >> stage >> shutDownTime;
INFO_MSG( "BaseApp::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( stage ) );
switch (stage)
{
case SHUTDOWN_INFORM:
{
// Make sure that we no longer process the external nub.
extInterface_.detach();
shutDownTime_ = shutDownTime;
shutDownReplyID_ = header.replyID;
if (this->hasStarted())
{
this->scriptEvents().triggerTwoEvents( "onAppShuttingDown",
isServiceApp_ ?
"onServiceAppShuttingDown" :
"onBaseAppShuttingDown",
Py_BuildValue( "(d)",
(double)shutDownTime/Config::updateHertz() ) );
// Don't send reply immediate to allow scripts to do some stuff.
}
else
{
baseAppMgr_.bundle().startReply( shutDownReplyID_ );
baseAppMgr_.send();
}
}
// 省略其他分支的处理
}
}
/**
* This method handles a message telling us to shut down in a controlled way.
*/
void CellApp::controlledShutDown(
const CellAppInterface::controlledShutDownArgs & args )
{
switch (args.stage)
{
case SHUTDOWN_INFORM:
{
if (shutDownTime_ != 0)
{
ERROR_MSG( "CellApp::controlledShutDown: "
"Setting shutDownTime_ to %u when already set to %u\n",
args.shutDownTime, shutDownTime_ );
}
shutDownTime_ = args.shutDownTime;
hasAckedCellAppMgrShutDown_ = false;
if (this->hasStarted())
{
this->scriptEvents().triggerTwoEvents(
"onAppShuttingDown", "onCellAppShuttingDown",
Py_BuildValue( "(d)",
(double)shutDownTime_/Config::updateHertz() ) );
// Don't send ack immediately to allow scripts to do stuff.
}
else
{
this->sendShutdownAck( SHUTDOWN_INFORM );
}
}
break;
case SHUTDOWN_PERFORM:
{
ERROR_MSG( "CellApp::controlledShutDown: "
"CellApp does not do SHUTDOWN_PERFORM stage.\n" );
// TODO: It could be good to call this so that we can call a script
// method.
break;
}
case SHUTDOWN_NONE:
case SHUTDOWN_REQUEST:
case SHUTDOWN_DISCONNECT_PROXIES:
case SHUTDOWN_TRIGGER:
break;
}
}
这两个事件都会发送到Python脚本里去处理,然后在下一帧里的计时器操作中会检查是设置了shutDownTime_,如果设置了就会发送这个退出消息的ACK确认消息到对应的MgrApp里:
/**
* This method handles timeout events.
*/
void BaseApp::handleTimeout( TimerHandle /*handle*/, void * arg )
{
uintptr timerType = reinterpret_cast<uintptr>( arg );
// TODO: Should investigate whether all this can be done in tickGameTime()
// instead, since we only seem to start a timer with TIMEOUT_GAME_TICK.
// Secondary database is used even during shutdown
if (pSqliteDB_ && (timerType == TIMEOUT_GAME_TICK))
{
pArchiver_->tickSecondaryDB( pSqliteDB_ );
}
bgTaskManager_.tick();
if (this->inShutDownPause())
{
if (shutDownReplyID_ != Mercury::REPLY_ID_NONE)
{
baseAppMgr_.bundle().startReply( shutDownReplyID_ );
baseAppMgr_.send();
// No longer regularly sending the load from now on.
baseAppMgr_.isRegular( false );
shutDownReplyID_ = Mercury::REPLY_ID_NONE;
}
return;
}
switch (timerType)
{
case TIMEOUT_GAME_TICK:
this->tickGameTime();
break;
}
}
/**
*
*/
void CellApp::tickShutdown()
{
if (!hasAckedCellAppMgrShutDown_)
{
this->sendShutdownAck( SHUTDOWN_INFORM );
// No longer regularly sending the load from now on.
cellAppMgr_.isRegular( false );
hasAckedCellAppMgrShutDown_ = true;
}
}
/**
* This method sends an ack to the CellAppMgr to indicate that we've
* finished one of our shutdown stages.
*/
void CellApp::sendShutdownAck( ShutDownStage stage )
{
cellAppMgr_.ackShutdown( stage );
}
void CellAppMgrGateway::ackShutdown( ShutDownStage stage )
{
Mercury::Bundle & bundle = channel_.bundle();
CellAppMgrInterface::ackCellAppShutDownArgs & rAckCellAppShutDown =
CellAppMgrInterface::ackCellAppShutDownArgs::start( bundle );
rAckCellAppShutDown.stage = stage;
channel_.send();
}
当对应的MgrApp收到了所有的子ServerApp的SHUTDOWN_INFORM的确认消息后,才能开启后续的SHUTDOWN_PERFORM阶段的关闭流程。
BaseAppMgr处理ACK的地方则是在SyncControlledShutDownHandler::handleMessage里,每收到一个ACK就执行一次decCount,当这个count_字段为0时,就代表所有的子BaseApp都已经ACK完毕,就会去执行ackBaseAppsShutDownToCellAppMgr来处理下一个阶段的关闭流程:
SyncControlledShutDownHandler::SyncControlledShutDownHandler(
ShutDownStage stage, int count ) :
stage_( stage ),
count_( count )
{
MF_ASSERT( count_ > 0 );
}
/**
* This method handles the reply message.
*/
void SyncControlledShutDownHandler::handleMessage(
const Mercury::Address & srcAddr,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data, void * )
{
this->decCount();
}
/**
* This method finalises and deletes this handler.
*/
void SyncControlledShutDownHandler::finalise()
{
BaseAppMgr * pApp = BaseAppMgr::pInstance();
if (pApp)
{
pApp->ackBaseAppsShutDownToCellAppMgr( stage_ );
}
delete this;
}
/**
* This method decrements the number of outstanding requests.
*/
void SyncControlledShutDownHandler::decCount()
{
--count_;
if (count_ == 0)
{
this->finalise();
}
}
而这里的ackBaseAppsShutDownToCellAppMgr作用就是通知CellAppMgr当前所有的BaseApp都已经ACK完毕,对应的处理函数还是在之前的InformHandler里,这里会设置成员变量baseAppsAcked_为true:
/**
* This method is called to inform CellAppMgr that the base apps are in a
* particular shutdown stage.
* @param stage Shutdown stage.
*/
void BaseAppMgr::ackBaseAppsShutDownToCellAppMgr( ShutDownStage stage )
{
DEBUG_MSG( "BaseAppMgr::ackBaseAppsShutDownToCellAppMgr: "
"All BaseApps have shut down, informing CellAppMgr\n" );
Mercury::Bundle & bundle = this->cellAppMgr().bundle();
CellAppMgrInterface::ackBaseAppsShutDownArgs & rAckBaseAppsShutDown =
CellAppMgrInterface::ackBaseAppsShutDownArgs::start( bundle );
rAckBaseAppsShutDown.stage = stage;
this->cellAppMgr().send();
}
/**
* This method is called to acknowledge that the base apps are in a particular
* shutdown stage.
*/
void CellAppMgr::ackBaseAppsShutDown(
const CellAppMgrInterface::ackBaseAppsShutDownArgs & args )
{
if (pShutDownHandler_)
{
pShutDownHandler_->ackBaseApps( args.stage );
}
}
void InformHandler::ackBaseApps( ShutDownStage stage )
{
if (stage != SHUTDOWN_INFORM)
{
ERROR_MSG( "InformHandler::ackBaseApps: Incorrect stage %s\n",
ServerApp::shutDownStageToString( stage ) );
}
baseAppsAcked_ = true;
this->checkStatus();
}
CellAppMgr处理ACK的地方在InformHandler::ackBaseApps,每个确认了ACK的CellApp都会放到ackedCellApps_这个集合里,当集合的大小大于等于当前管理的CellApp的大小且baseAppsAcked_为true时,就代表所有的BaseApp和所有的CellApp都接收到了SHUTDOWN_INFORM,接下来会去执行PerformBaseAppsHandler,这里会使用writeSpacesToDB将所有的Space数据进行存库,同时往BaseAppMgr发送controlledShutDown通知可以切换到SHUTDOWN_PERFORM了:
void InformHandler::checkStatus()
{
INFO_MSG( "InformHandler::checkStatus: "
"baseAppsAcked_ = %d. cells %" PRIzu "/%d shutDownTime %u time %u\n",
baseAppsAcked_,
ackedCellApps_.size(), mgr_.numCellApps(),
shutDownTime_, mgr_.time() );
if (this->isPaused() &&
baseAppsAcked_ &&
ackedCellApps_.size() >= size_t(mgr_.numCellApps()))
{
// TODO: Could check that the correct ones are ack'ed.
// mgr_.setShutDownHandler( new PerformCellAppHandler( mgr_ ) );
mgr_.setShutDownHandler( new PerformBaseAppsHandler( mgr_ ) );
delete this;
}
}
void InformHandler::ackCellApp( ShutDownStage stage, CellApp & app )
{
if (stage != SHUTDOWN_INFORM)
{
ERROR_MSG( "InformHandler::ackCellApp: Incorrect stage %s\n",
ServerApp::shutDownStageToString( stage ) );
}
ackedCellApps_.insert( &app );
this->checkStatus();
}
PerformBaseAppsHandler::PerformBaseAppsHandler( CellAppMgr & mgr ) : mgr_( mgr )
{
CellAppMgr & app = CellAppMgr::instance();
app.writeGameTimeToDB();
app.writeSpacesToDB();
// Tell the BaseAppMgr to perform.
{
BaseAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_PERFORM;
args.shutDownTime = 0;
Mercury::Bundle & bundle = app.baseAppMgr().bundle();
bundle << args;
app.baseAppMgr().send();
}
}
/**
* This method writes space information to the database.
*/
void CellAppMgr::writeSpacesToDB()
{
if (Config::archivePeriodInTicks() == 0 ||
!Config::shouldArchiveSpaceData())
{
return;
}
if (this->dbAppAlpha().channel().isEstablished())
{
Mercury::Bundle & bundle = this->dbAppAlpha().bundle();
bundle.startMessage( DBAppInterface::writeSpaces );
bundle << uint32( spaces_.size() );
Spaces::const_iterator iter = spaces_.begin();
while (iter != spaces_.end())
{
iter->second->sendToDB( bundle );
++iter;
}
this->dbAppAlpha().send();
}
else
{
WARNING_MSG( "CellAppMgr::writeSpacesToDB: "
"No known DBApp, not writing to DB.\n" );
}
}
当BaseAppMgr::controlledShutDown处理SHUTDOWN_PERFORM时,会触发BaseAppMgr::startAsyncShutDownStage,这里收集当前所有的BaseApp的地址,使用AsyncControlledShutDownHandler来发送controlledShutDown通知,并等待这些BaseApp的ACK:
/**
* This method responds to a message telling us what stage of the controlled
* shutdown process the server is at.
*/
void BaseAppMgr::controlledShutDown(
const BaseAppMgrInterface::controlledShutDownArgs & args )
{
INFO_MSG( "BaseAppMgr::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( args.stage ) );
switch (args.stage)
{
// 省略之前已经介绍过的两个分支
case SHUTDOWN_PERFORM:
{
this->startAsyncShutDownStage( SHUTDOWN_DISCONNECT_PROXIES );
break;
}
case SHUTDOWN_TRIGGER:
{
this->controlledShutDownServer();
break;
}
case SHUTDOWN_NONE:
case SHUTDOWN_DISCONNECT_PROXIES:
break;
}
}
void BaseAppMgr::startAsyncShutDownStage( ShutDownStage stage )
{
BW::vector< Mercury::Address > addrs;
addrs.reserve( baseAndServiceApps_.size() );
BaseApps::const_iterator iter = baseAndServiceApps_.begin();
while (iter != baseAndServiceApps_.end())
{
addrs.push_back( iter->first );
++iter;
}
// This object deletes itself.
new AsyncControlledShutDownHandler( stage, addrs );
}
这里的AsyncControlledShutDownHandler会遍历初始时传入的BaseApp地址集合,发送一个controlledShutDown通知,此时的stage为SHUTDOWN_DISCONNECT_PROXIES:
AsyncControlledShutDownHandler::AsyncControlledShutDownHandler(
ShutDownStage stage, BW::vector< Mercury::Address > & addrs ) :
stage_( stage ),
numToSend_( 0 )
{
addrs_.swap( addrs );
this->sendNext();
}
void AsyncControlledShutDownHandler::sendNext()
{
bool shouldDeleteThis = true;
BaseAppMgr * pApp = BaseAppMgr::pInstance();
if (pApp)
{
while (true)
{
if (numToSend_ < int(addrs_.size()))
{
Mercury::ChannelOwner * pChannelOwner =
pApp->findChannelOwner( addrs_[ numToSend_ ] );
if (pChannelOwner != NULL &&
!pChannelOwner->channel().hasRemoteFailed())
{
Mercury::Bundle & bundle = pChannelOwner->bundle();
bundle.startRequest(
BaseAppIntInterface::controlledShutDown, this );
shouldDeleteThis = false;
bundle << stage_;
bundle << 0;
pChannelOwner->send();
}
else
{
WARNING_MSG( "AsyncControlledShutDownHandler::sendNext: "
"Could not find channel for %s\n",
addrs_[ numToSend_ ].c_str() );
++numToSend_;
continue;
}
++numToSend_;
}
else if (stage_ == SHUTDOWN_DISCONNECT_PROXIES)
{
// This object deletes itself.
new AsyncControlledShutDownHandler( SHUTDOWN_PERFORM, addrs_ );
}
else
{
Mercury::Bundle & bundle = pApp->cellAppMgr().bundle();
CellAppMgrInterface::ackBaseAppsShutDownArgs &
rAckBaseAppsShutDown =
CellAppMgrInterface::ackBaseAppsShutDownArgs::start(
bundle );
rAckBaseAppsShutDown.stage = stage_;
pApp->cellAppMgr().send();
pApp->shutDown( false );
}
break;
}
}
if (shouldDeleteThis)
{
delete this;
}
}
这个SHUTDOWN_DISCONNECT_PROXIES的作用是通知当前BaseApp上的所有Proxy使用onClientDeath( CLIENT_DISCONNECT_SHUTDOWN )去强制断开客户端的连接,也就是常说的关服封网:
case SHUTDOWN_DISCONNECT_PROXIES:
{
if (this->hasStarted())
{
this->callShutDownCallback( 0 );
// TODO: Should probably spread this out over time.
typedef BW::vector< SmartPointer< Proxy > > CopiedProxies;
CopiedProxies copyOfProxies;
{
copyOfProxies.reserve( proxies_.size() );
Proxies::iterator iter = proxies_.begin();
while (iter != proxies_.end())
{
copyOfProxies.push_back( iter->second );
++iter;
}
}
{
CopiedProxies::iterator iter = copyOfProxies.begin();
while (iter != copyOfProxies.end())
{
(*iter)->onClientDeath( CLIENT_DISCONNECT_SHUTDOWN );
++iter;
}
}
}
IF_NOT_MF_ASSERT_DEV( baseAppMgr_.addr() == srcAddr )
{
break;
}
baseAppMgr_.bundle().startReply( header.replyID );
baseAppMgr_.send();
break;
}
当所有的Proxy都被通知断开连接后,SHUTDOWN_DISCONNECT_PROXIES阶段就完成了,开始发送消息到BaseAppMgr来通知此阶段结束,然后BaseAppMgr的AsyncControlledShutDownHandler就会切换到SHUTDOWN_PERFORM阶段:
void AsyncControlledShutDownHandler::handleMessage(
const Mercury::Address & srcAddr,
Mercury::UnpackedMessageHeader & header,
BinaryIStream & data, void * )
{
DEBUG_MSG( "AsyncControlledShutDownHandler::handleMessage: "
"BaseApp %s has finished stage %s\n",
srcAddr.c_str(), ServerApp::shutDownStageToString( stage_ ) );
if (stage_ == SHUTDOWN_PERFORM)
{
BaseAppMgr * pApp = BaseAppMgr::pInstance();
pApp->removeControlledShutdownBaseApp( srcAddr );
}
this->sendNext();
}
void AsyncControlledShutDownHandler::sendNext()
{
bool shouldDeleteThis = true;
BaseAppMgr * pApp = BaseAppMgr::pInstance();
if (pApp)
{
while (true)
{
if (numToSend_ < int(addrs_.size()))
{
// 省略一些代码
}
else if (stage_ == SHUTDOWN_DISCONNECT_PROXIES)
{
// This object deletes itself.
new AsyncControlledShutDownHandler( SHUTDOWN_PERFORM, addrs_ );
}
}
}
}
这样又会对所有的子BaseApp发送SHUTDOWN_PERFORM阶段的消息,在对应的BaseApp::controlledShutDown会构造一个ControlledShutdown::start来开启清理流程:
void BaseApp::controlledShutDown( const Mercury::Address& srcAddr,
const Mercury::UnpackedMessageHeader& header,
BinaryIStream & data )
{
ShutDownStage stage;
GameTime shutDownTime;
data >> stage >> shutDownTime;
INFO_MSG( "BaseApp::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( stage ) );
switch (stage)
{
// 省略其他分支
case SHUTDOWN_PERFORM:
{
if (this->hasStarted())
{
this->callShutDownCallback( 1 );
}
ControlledShutdown::start( pSqliteDB_,
bases_, localServiceFragments_,
header.replyID, srcAddr );
break;
}
}
}
namespace ControlledShutdown
{
void start( SqliteDatabase * pSecondaryDB,
const Bases & bases,
Bases & localServiceFragments,
Mercury::ReplyID replyID,
const Mercury::Address & srcAddr )
{
localServiceFragments.discardAll();
// This object deletes itself.
ControlledShutDownHandler * pHandler = NULL;
if (pSecondaryDB)
{
pHandler = new ShutDownHandlerWithSecondaryDB( *pSecondaryDB );
}
else
{
pHandler = new ShutDownHandlerWithoutSecondaryDB();
}
pHandler->init( bases, replyID, srcAddr );
}
} // namespace ControlledShutdown
这里的ShutDownHandlerWithSecondaryDB和ShutDownHandlerWithoutSecondaryDB都是ControlledShutDownHandler的子类,作用都是将所有的Base执行存库操作,它们的区别是是否有二级数据库。当所有的Base数据都存入数据库了之后,开始通知BaseAppMgr当前BaseApp的清理流程已经完成,同时当前BaseApp调用shutDown函数来执行进行最后的清理并退出:
void ControlledShutDownHandler::checkFinished()
{
if (numOutstanding_ != 0)
{
return;
}
BaseApp * pApp = BaseApp::pInstance();
if (pApp == NULL)
{
ERROR_MSG( "ControlledShutDownHandler::checkFinished: pApp is NULL\n" );
return;
}
BaseAppMgrGateway & baseAppMgr = pApp->baseAppMgr();
IF_NOT_MF_ASSERT_DEV( srcAddr_ == baseAppMgr.addr() )
{
return;
}
baseAppMgr.bundle().startReply( replyID_ );
baseAppMgr.send();
pApp->callShutDownCallback( 2 );
delete this;
pApp->shutDown();
}
当BaseAppMgr接收到了所有的BaseApp都存库完成的回应之后,开始给CellAppMgr来发送SHUTDOWN_PERFORM阶段的消息,同时这里也会调用shutDown函数来触发当前BaseAppMgr的进程退出:
void AsyncControlledShutDownHandler::sendNext()
{
bool shouldDeleteThis = true;
BaseAppMgr * pApp = BaseAppMgr::pInstance();
if (pApp)
{
while (true)
{
if (numToSend_ < int(addrs_.size()))
{
// 省略一些代码
}
else if (stage_ == SHUTDOWN_DISCONNECT_PROXIES)
{
// This object deletes itself.
new AsyncControlledShutDownHandler( SHUTDOWN_PERFORM, addrs_ );
}
else
{
Mercury::Bundle & bundle = pApp->cellAppMgr().bundle();
CellAppMgrInterface::ackBaseAppsShutDownArgs &
rAckBaseAppsShutDown =
CellAppMgrInterface::ackBaseAppsShutDownArgs::start(
bundle );
rAckBaseAppsShutDown.stage = stage_;
pApp->cellAppMgr().send();
pApp->shutDown( false );
}
break;
}
}
if (shouldDeleteThis)
{
delete this;
}
}
当CellAppMgr接收到这个ackBaseAppsShutDown的消息时,当前的pShutDownHandler_已经是PerformBaseAppsHandler了,此时的处理就是通知DBAppMgr当前CellAppMgr的清理流程已经完成,同时调用shutDown来触发当前CellAppMgr的进程退出:
/**
* This method is called to acknowledge that the base apps are in a particular
* shutdown stage.
*/
void CellAppMgr::ackBaseAppsShutDown(
const CellAppMgrInterface::ackBaseAppsShutDownArgs & args )
{
if (pShutDownHandler_)
{
pShutDownHandler_->ackBaseApps( args.stage );
}
}
/**
* This method is called when the BaseApps have all completed the perform
* stage.
*/
void PerformBaseAppsHandler::ackBaseApps( ShutDownStage stage )
{
DBAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_PERFORM;
mgr_.dbAppMgr().bundle() << args;
mgr_.dbAppMgr().send();
mgr_.setShutDownHandler( NULL );
mgr_.shutDown( /* shutDownOthers */ true );
delete this;
}
注意到这里并没有通知所有的CellApp去执行SHUTDOWN_PERFORM,因为CellApp压根没有SHUTDOWN_PERFORM阶段,下面这两个函数的注释里都强调了这个问题:
/**
* This method handles an ack from a CellApp. This should not be called because
* the CellApps are not involved in this stage.
*/
void PerformBaseAppsHandler::ackCellApp( ShutDownStage stage, CellApp & app )
{
ERROR_MSG( "PerformBaseAppsHandler::ackCellApp: Got stage %s from %s\n",
ServerApp::shutDownStageToString( stage ), app.addr().c_str() );
}
/**
* This method handles a message telling us to shut down in a controlled way.
*/
void CellApp::controlledShutDown(
const CellAppInterface::controlledShutDownArgs & args )
{
switch (args.stage)
{
case SHUTDOWN_INFORM:
{
// 省略代码
}
break;
case SHUTDOWN_PERFORM:
{
ERROR_MSG( "CellApp::controlledShutDown: "
"CellApp does not do SHUTDOWN_PERFORM stage.\n" );
// TODO: It could be good to call this so that we can call a script
// method.
break;
}
case SHUTDOWN_NONE:
case SHUTDOWN_REQUEST:
case SHUTDOWN_DISCONNECT_PROXIES:
case SHUTDOWN_TRIGGER:
break;
}
}
这些CellApp的关闭则是在PerformBaseAppsHandler::ackBaseApps里的mgr_.cellAppMgr().shutDown( true );函数调用里触发的,这个函数会遍历当前的所有CellApp,并调用每个CellApp的shutDown函数来触发发送一个shutDown请求到这个CellApp,并最终执行进程退出:
void PerformBaseAppsHandler::ackBaseApps( ShutDownStage stage )
{
DBAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_PERFORM;
mgr_.dbAppMgr().bundle() << args;
mgr_.dbAppMgr().send();
mgr_.setShutDownHandler( NULL );
mgr_.shutDown( /* shutDownOthers */ true );
delete this;
}
/**
* This method shuts down this application.
*/
void CellAppMgr::shutDown( bool shutDownOthers )
{
INFO_MSG( "CellAppMgr::shutDown: shutDownOthers = %d\n", shutDownOthers );
if (shutDownOthers)
{
cellApps_.shutDownAll();
}
INFO_MSG( "CellAppMgr::shutDown: Told to shut down. shutDownOthers = %d\n",
shutDownOthers );
isShuttingDown_ = true;
this->mainDispatcher().breakProcessing();
}
void CellApps::shutDownAll()
{
Map::iterator appIter = map_.begin();
while (appIter != map_.end())
{
CellApp * pApp = appIter->second;
pApp->shutDown();
++appIter;
}
}
void CellApp::shutDown()
{
CellAppInterface::shutDownArgs args;
args.isSigInt = false; // Not used.
this->bundle() << args;
this->send();
}
/**
* This method handles a shutDown message.
*/
void CellApp::shutDown( const CellAppInterface::shutDownArgs & /*args*/ )
{
TRACE_MSG( "CellApp::shutDown: Told to shut down\n" );
this->shutDown();
}
前面PerformBaseAppsHandler::ackBaseApps在CellAppMgr执行shutDown之前会通知controlledShutDown到DBAppMgr,当DBAppMgr收到这个消息的时候,就会通知所有的DbApp执行SHUTDOWN_PERFORM,并调用shutDown来触发当前DBAppMgr的进程退出:
/**
* This method shut down the system in a controlled manner.
*/
void DBAppMgr::controlledShutDown(
const DBAppMgrInterface::controlledShutDownArgs & args )
{
DEBUG_MSG( "DBAppMgr::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( args.stage ) );
switch (args.stage)
{
case SHUTDOWN_REQUEST:
{
// 省略一些代码
}
case SHUTDOWN_PERFORM:
{
INFO_MSG( "DBAppMgr::controlledShutDown: "
"Telling %" PRIzu " DBApps to shut down\n",
dbApps_.size() );
for (DBApps::const_iterator iter = dbApps_.begin();
iter != dbApps_.end();
++iter)
{
iter->second->controlledShutDown( SHUTDOWN_PERFORM );
}
this->interface().processUntilChannelsEmpty();
this->shutDown();
break;
}
default:
ERROR_MSG( "DBAppMgr::controlledShutDown: Stage %s not handled.\n",
ServerApp::shutDownStageToString( args.stage ) );
break;
}
}
现在DBAppMgr, DbApp, CellAppMgr, CellApp, BaseAppMgr, BaseApp都已经完成了有序退出,就剩LoginApp了。事实上前面的BaseAppMgr::controlledShutDownServer有通知LoginApp::controlledShutDown的代码,所以LoginApp也会收到controlledShutDown的消息,从而调用mainDispatcher_.breakProcessing触发进程退出:
void BaseAppMgr::controlledShutDownServer()
{
if (shutDownStage_ != SHUTDOWN_NONE)
{
DEBUG_MSG( "BaseAppMgr::controlledShutDownServer: "
"Already shutting down, ignoring duplicate shutdown request.\n" );
return;
}
// First try to trigger controlled shutdown via the loginapp
Mercury::Address loginAppAddr;
Mercury::Reason reason = Mercury::MachineDaemon::findInterface(
"LoginIntInterface", -1, loginAppAddr );
if (reason == Mercury::REASON_SUCCESS)
{
Mercury::ChannelSender sender( BaseAppMgr::getChannel( loginAppAddr ) );
Mercury::Bundle & bundle = sender.bundle();
bundle.startMessage( LoginIntInterface::controlledShutDown );
INFO_MSG( "BaseAppMgr::controlledShutDownServer: "
"Triggering server shutdown via LoginApp @ %s\n",
loginAppAddr.c_str() );
return;
}
else
{
ERROR_MSG( "BaseAppMgr::controlledShutDownServer: "
"Couldn't find a LoginApp to trigger server shutdown\n" );
}
// 省略一些代码
}
/**
* Handles incoming shutdown requests. This is basically another way of
* triggering a controlled system shutdown instead of sending a SIGUSR1.
*/
void LoginApp::controlledShutDown( const Mercury::Address &source,
Mercury::UnpackedMessageHeader &header,
BinaryIStream &data )
{
INFO_MSG( "LoginApp::controlledShutDown: "
"Got shutdown command from %s\n", source.c_str() );
this->controlledShutDown();
}
void LoginApp::controlledShutDown()
{
mainDispatcher_.breakProcessing();
}
在LoginApp进程退出的时候,还会通知一下DbAppMgr执行SHUTDOWN_REQUEST:
void LoginApp::onRunComplete() /* override */
{
INFO_MSG( "LoginApp::run: Terminating normally.\n" );
this->ServerApp::onRunComplete();
bool sent = false;
if (this->isDBAppMgrReady())
{
Mercury::Bundle & dbMgrBundle = dbAppMgr_.pChannelOwner()->bundle();
DBAppMgrInterface::controlledShutDownArgs args;
args.stage = SHUTDOWN_REQUEST;
dbMgrBundle << args;
dbAppMgr_.pChannelOwner()->send();
sent = true;
}
if (sent)
{
this->intInterface().processUntilChannelsEmpty();
}
}
但是这里DBAppMgr::controlledShutDown处理SHUTDOWN_REQUEST时,又会去通知BaseAppMgr执行SHUTDOWN_REQUEST,感觉有点没必要,因为CellApp和BaseApp发起的SHUTDOWN_REQUEST都会通知到BaseAppMgr:
void DBAppMgr::controlledShutDown(
const DBAppMgrInterface::controlledShutDownArgs & args )
{
DEBUG_MSG( "DBAppMgr::controlledShutDown: stage = %s\n",
ServerApp::shutDownStageToString( args.stage ) );
switch (args.stage)
{
case SHUTDOWN_REQUEST:
{
// Make sure we no longer send to anonymous channels etc.
interface_.stopPingingAnonymous();
isShuttingDown_ = true;
BaseAppMgrInterface::controlledShutDownArgs & args =
args.start( baseAppMgr_.bundle() );
args.stage = SHUTDOWN_REQUEST;
args.shutDownTime = 0;
baseAppMgr_.send();
break;
}
case SHUTDOWN_PERFORM:
{
// 省略一些代码
}
default:
ERROR_MSG( "DBAppMgr::controlledShutDown: Stage %s not handled.\n",
ServerApp::shutDownStageToString( args.stage ) );
break;
}
}
后面继续分析代码发现,LoginApp可以单独接受SIGUSR1信号,触发controlledShutDown,从而导致进程退出。此时由于不是通过CellApp或BaseApp发起的SHUTDOWN_REQUEST,所以这里LoginApp退出的时候通知DBAppMgr并间接通知BaseAppMgr执行SHUTDOWN_REQUEST是有必要的:
void LoginApp::controlledShutDown()
{
mainDispatcher_.breakProcessing();
}
/*
* Override from ServerApp.
*/
void LoginApp::onSignalled( int sigNum ) /* override */
{
this->ServerApp::onSignalled( sigNum );
if (sigNum == SIGUSR1)
{
this->controlledShutDown();
}
}
BWMachineD的清理
当一个进程对应的ServerApp执行完逻辑之后,需要从本地的BWMachineD里彻底删除这个ServerApp的注册信息。这个注册信息的删除逻辑在NetworkInterface的析构函数里。这个析构函数会通知本地的BWMachineD来执行进程的反注册并关闭所有的网络连接,最后关闭监听socket:
/**
* Destructor.
*/
NetworkInterface::~NetworkInterface()
{
this->interfaceTable().deregisterWithMachined( this->address() );
// This cancels outstanding requests. Need to make sure no more are added.
this->finaliseRequestManager();
pChannelMap_->destroyOwnedChannels();
this->detach();
this->closeSocket();
bw_safe_delete( pDelayedChannels_ );
bw_safe_delete( pIrregularChannels_ );
bw_safe_delete( pKeepAliveChannels_ );
bw_safe_delete( pCondemnedChannels_ );
bw_safe_delete( pOnceOffSender_ );
bw_safe_delete( pInterfaceTable_ );
bw_safe_delete( pPacketSender_ );
bw_safe_delete( pPacketReceiver_ );
bw_safe_delete( pPacketLossParameters_ );
bw_safe_delete( pDispatcher_ );
bw_safe_delete( pChannelMap_ );
bw_safe_delete( pRecentlyDeadChannels_ );
}
这里的deregisterWithMachined会调用之前提到的registerWithMachined方法,只是将最后一个参数isRegister设置为false,来表示这是一个反注册请求:
/**
* This function deregisters a socket with BWMachined.
*/
Reason deregisterWithMachined( const Address & srcAddr,
const BW::string & name, int id )
{
return name.empty() ?
REASON_SUCCESS :
registerWithMachined( srcAddr, name, id, /*isRegister:*/ false );
}
当本机的BWMachineD接收到这个反注册请求之后,会调用removeRegisteredProc方法来移除这个进程的注册信息:
case ProcessMessage::DEREGISTER:
{
unsigned int i;
for (i=0; i < procs_.size(); i++)
if (pm.pid_ == procs_[i].m.pid_)
break;
if (i >= procs_.size())
syslog( LOG_ERR, "Couldn't find pid %d to deregister it\n",
pm.pid_ );
else
removeRegisteredProc( i );
// confirm the deregistration to the sender
pm.outgoing( true );
replies.append( pm );
syslog( LOG_INFO, "Deregistered %s for uid:%d\n",
pm.c_str(), pm.uid_ );
return true;
}
在这个方法里,除了会从procs数组里进行元素的移除之外,还会调用broadcastToListeners方法来广播到所有的BWMachineD进程,通知所有监听者这个进程已经死亡:
void BWMachined::removeRegisteredProc( unsigned index )
{
if (index >= procs_.size())
{
syslog( LOG_ERR, "Can't remove reg proc at index %d/%" PRIzu "",
index, procs_.size() );
return;
}
ProcessInfo &pinfo = procs_[ index ];
ProcessMessage pm;
pm << pinfo.m;
this->broadcastToListeners( pm, pm.NOTIFY_DEATH );
procs_.erase( procs_.begin() + index );
}
case ProcessMessage::NOTIFY_DEATH:
{
deathListeners_.handleNotify( this->endpoint(), pm, sin.sin_addr );
return true;
}
走完这个流程之后,这个进程就彻底的从集群里的BWMachined里消失了,其他进程再也无法连接到这个进程了。