Unreal Engine的网络通信
基础概念
Unreal Engine(后续简称为UE)能够使用同一份代码编译出游戏客户端和游戏专属服务器Dedicated Server,客户端与专属服务端之间的通信使用的是UDP。之所以选择UDP是因为UE从诞生起就跟FPS紧密连接,而FPS这类游戏对于网络延迟突然增大的容忍性是极其低的。UE在UDP的基础上也做了很多的封装,来实现客户端与服务端之间连接的可靠传输。这里我们就来介绍一下UE网络的相关实现细节。
UE网络通信中基础的概念主要包括如下五个:
-
NetDriver网络处理的核心,负责管理 所有的物理数据连接,实际使用的是其子类IpNetDriver。里面封装了初始化客户端与服务器的连接,建立属性记录表,处理RPC函数,创建Socket,构建并管理当前Connection信息,接收数据包等等基本操作。服务器NetDriver维护一个NetConnections列表,每个连接代表一个连接的玩家客户端,负责复制Actor数据。客户端NetDriver管理连接到服务器的单个连接 -
NetConnection在服务器和客户端上,NetDriver负责接收来自网络的数据包并将这些数据包传递给适当的NetConnection(必要时建立新的NetConnection) -
Channel数据通道 每一个通道只负责交换某一个特定类型特定实例的数据信息。UE中预先定义了如下几个通道:ControlChannel:客户端与服务器之间发送控制信息,主要是发送接收连接与断开的相关消息。在Connection中只会在初始化连接的时候创建一个该通道实例。ActorChannel:处理Actor本身相关信息的同步,包括自身的同步以及子组件,属性的同步,RPC调用等。每个Connection连接里的每个同步的Actor都对应着一个ActorChannel实例VoiceChannel:用于发送接收语音消息,在Connection中初始化连接的时候创建一个该通道实例

-
Packet是在客户端服务端网络连接之间发送的数据,每次通道最后发出去的包都成为一个Packet, 数据内容由Packet元数据(如报头信息和确认Ack)和Bunches组成。Packet是真正在UDP链路上发送的数据。 -
Bunch是在客户端服务端网络连接的通道对之间发送的数据。当一个连接接收到一个数据包时,该数据包将被分解成单独的Bunch,这些Bunch然后被传递到单独的通道以进一步处理。一个Packet可以不包含Bunch、单个Bunch或者多个Bunch。当一个Bunch太大时,在传输之前会把它切成许多小Bunch,这些Bunch将被标记为PartialInitial,Partial或PartialFinal。利用这些信息,在接收端重新组装Bunch。然后Bunch又可以分为Reliable Bunch和Unreliable Bunch,Reliable Bunch会有丢包重传机制保证在接收端按照发送时的顺序接收,而Unreliable Bunch在丢包之后不会做任何处理。
消息发送
基本上所有的UE网络通信发包都会调用这个接口UChannel::SendBunch:
/** Send a bunch if it's not overflowed, and queue it if it's reliable. */
ENGINE_API virtual FPacketIdRange SendBunch(FOutBunch* Bunch, bool Merge);
例如在ReplicationGraph里执行单个Actor数据的向下同步时,会在数据都填充好之后使用这样的代码将这次数据从所属的Channel里向下发送:
int64 UReplicationGraph::ReplicateSingleActor_FastShared(AActor* Actor, FConnectionReplicationActorInfo& ConnectionData,
FGlobalActorReplicationInfo& GlobalActorInfo, UNetReplicationGraphConnection& ConnectionManager, const uint32 FrameNum)
{
// 省略数据填充相关代码
// Setup the connection specifics on the bunch before calling SendBunch
OutBunch.ChName = ActorChannel->ChName;
OutBunch.ChIndex = ActorChannel->ChIndex;
OutBunch.Channel = ActorChannel;
OutBunch.Next = nullptr;
// SendIt
{
FGuardValue_Bitfield(ActorChannel->bHoldQueuedExportBunchesAndGUIDs, true);
ActorChannel->SendBunch(&OutBunch, false);
}
}
这里的FOutBunch的结构比较简单,继承自BitWriter,来承接基于bit的消息输入,此外还携带了很多传输控制信息,例如所属的Channel信息:
//
// A bunch of data to send.
//
class ENGINE_API FOutBunch : public FNetBitWriter
{
public:
// Variables.
FOutBunch * Next;
UChannel * Channel;
double Time;
int32 ChIndex;
FName ChName;
int32 ChSequence;
int32 PacketId;
uint8 ReceivedAck:1;
uint8 bOpen:1;
uint8 bClose:1;
uint8 bIsReplicationPaused:1; // Replication on this channel is being paused by the server
uint8 bReliable:1;
uint8 bPartial:1; // Not a complete bunch
uint8 bPartialInitial:1; // The first bunch of a partial bunch
uint8 bPartialFinal:1; // The final bunch of a partial bunch
uint8 bHasPackageMapExports:1; // This bunch has networkGUID name/id pairs
uint8 bHasMustBeMappedGUIDs:1; // This bunch has guids that must be mapped before we can process this bunch
EChannelCloseReason CloseReason;
TArray< FNetworkGUID > ExportNetGUIDs; // List of GUIDs that went out on this bunch
TArray< uint64 > NetFieldExports;
};
这里的Time代表这个Bunch发送时的时间戳,bOpen/bClose是用来控制单个Channel的创建与销毁的,bReliable用来控制当前Bunch是否是需要可靠传输的消息, bPartial/bPartialInitial/bPartialFinal这三个字段负责控制Bunch的拆分。在执行SendBunch的时候,会首先使用IsBunchTooLarge判断当前要发送的Bunch是否大于最大单Bunch大小NetMaxConstructedPartialBunchSizeBytes=65536字节,数据量太大的话这个包会被直接丢弃。然后再判断当前Bunch能否与Connection里发送队列里的最后一个Bunch合并,这样合并多个包就可以合并为一个包,减少底层的UDP发送接口的调用次数:
// Fairly large number, and probably a bad idea to even have a bunch this size, but want to be safe for now and not throw out legitimate data
static int32 NetMaxConstructedPartialBunchSizeBytes = 1024 * 64;
static FAutoConsoleVariableRef CVarNetMaxConstructedPartialBunchSizeBytes(
TEXT("net.MaxConstructedPartialBunchSizeBytes"),
NetMaxConstructedPartialBunchSizeBytes,
TEXT("The maximum size allowed for Partial Bunches.")
);
FPacketIdRange UChannel::SendBunch( FOutBunch* Bunch, bool Merge )
{
if (IsBunchTooLarge(Connection, Bunch))
{
UE_LOG(LogNetPartialBunch, Error, TEXT("Attempted to send bunch exceeding max allowed size. BunchSize=%d, MaximumSize=%d"), Bunch->GetNumBytes(), NetMaxConstructedPartialBunchSizeBytes);
Bunch->SetError();
return FPacketIdRange(INDEX_NONE);
}
// 省略很多的代码
// This is the max number of bits we can have in a single bunch
const int64 MAX_SINGLE_BUNCH_SIZE_BITS = Connection->GetMaxSingleBunchSizeBits();
// Max bytes we'll put in a partial bunch
const int64 MAX_SINGLE_BUNCH_SIZE_BYTES = MAX_SINGLE_BUNCH_SIZE_BITS / 8;
// Max bits will put in a partial bunch (byte aligned, we dont want to deal with partial bytes in the partial bunches)
const int64 MAX_PARTIAL_BUNCH_SIZE_BITS = MAX_SINGLE_BUNCH_SIZE_BYTES * 8;
//-----------------------------------------------------
// Contemplate merging.
//-----------------------------------------------------
int32 PreExistingBits = 0;
FOutBunch* OutBunch = NULL;
if
( Merge
&& Connection->LastOut.ChIndex == Bunch->ChIndex
&& Connection->LastOut.bReliable == Bunch->bReliable // Don't merge bunches of different reliability, since for example a reliable RPC can cause a bunch with properties to become reliable, introducing unnecessary latency for the properties.
&& Connection->AllowMerge
&& Connection->LastEnd.GetNumBits()
&& Connection->LastEnd.GetNumBits()==Connection->SendBuffer.GetNumBits()
&& Connection->LastOut.GetNumBits() + Bunch->GetNumBits() <= MAX_SINGLE_BUNCH_SIZE_BITS )
{
// Merge.
check(!Connection->LastOut.IsError());
PreExistingBits = Connection->LastOut.GetNumBits();
Connection->LastOut.SerializeBits( Bunch->GetData(), Bunch->GetNumBits() );
Connection->LastOut.bOpen |= Bunch->bOpen;
Connection->LastOut.bClose |= Bunch->bClose;
#if UE_NET_TRACE_ENABLED
SetTraceCollector(Connection->LastOut, GetTraceCollector(*Bunch));
SetTraceCollector(*Bunch, nullptr);
#endif
OutBunch = Connection->LastOutBunch;
Bunch = &Connection->LastOut;
check(!Bunch->IsError());
Connection->PopLastStart();
Connection->Driver->OutBunches--;
}
// 省略很多代码
}
执行Bunch合并的时候有一些过滤条件,要求归属于同一个Channel同时bReliable字段要一致,且当前Bunch被设置为允许合并且当前Connection也要设置为允许合并。最后还有一个最大Bunch大小的限制,要求合并之后的Bunch大小要小于MAX_SINGLE_BUNCH_SIZE_BITS,这个上限的设置目的是为了避免单个Bunch在使用UDP传输的时候被IP层的MTU限制导致链路层拆包。这里的MAX_SINGLE_BUNCH_SIZE_BITS的值是根据UNetConnection::GetMaxSingleBunchSizeBits()计算出来的,注意这里的单位是bit,而不是常见的字节:
/** The maximum number of bits allowed within a single bunch. */
FORCEINLINE int32 GetMaxSingleBunchSizeBits() const
{
return (MaxPacket * 8) - MAX_BUNCH_HEADER_BITS - MAX_PACKET_TRAILER_BITS - MAX_PACKET_HEADER_BITS - MaxPacketHandlerBits;
}
void UNetConnection::InitConnection(UNetDriver* InDriver, EConnectionState InState, const FURL& InURL, int32 InConnectionSpeed, int32 InMaxPacket)
{
Driver = InDriver;
// We won't be sending any packets, so use a default size
MaxPacket = (InMaxPacket == 0 || InMaxPacket > MAX_PACKET_SIZE) ? MAX_PACKET_SIZE : InMaxPacket;
// 省略后续代码
}
enum { MAX_PACKET_SIZE = 1024 }; // MTU for the connection
这里的MaxPacket有一个硬上限MAX_PACKET_SIZE,设置为了1024,但是常见的MTU一般来说是1492,看来UE里设置的更加保守。同样由于这个MTU的限制,如果一个Bunch填充完数据发现太大了,就需要执行拆包操作:
FPacketIdRange UChannel::SendBunch( FOutBunch* Bunch, bool Merge )
{
// 省略之前的代码
//-----------------------------------------------------
// Possibly split large bunch into list of smaller partial bunches
//-----------------------------------------------------
if( Bunch->GetNumBits() > MAX_SINGLE_BUNCH_SIZE_BITS )
{
uint8 *data = Bunch->GetData();
int64 bitsLeft = Bunch->GetNumBits();
Merge = false;
while(bitsLeft > 0)
{
FOutBunch * PartialBunch = new FOutBunch(this, false);
int64 bitsThisBunch = FMath::Min<int64>(bitsLeft, MAX_PARTIAL_BUNCH_SIZE_BITS);
PartialBunch->SerializeBits(data, bitsThisBunch);
#if UE_NET_TRACE_ENABLED
// Attach tracecollector of split bunch to first partial bunch
SetTraceCollector(*PartialBunch, GetTraceCollector(*Bunch));
SetTraceCollector(*Bunch, nullptr);
#endif
OutgoingBunches.Add(PartialBunch);
#if !(UE_BUILD_SHIPPING || UE_BUILD_TEST)
PartialBunch->DebugString = FString::Printf(TEXT("Partial[%d]: %s"), OutgoingBunches.Num(), *Bunch->DebugString);
#endif
bitsLeft -= bitsThisBunch;
data += (bitsThisBunch >> 3);
UE_LOG(LogNetPartialBunch, Log, TEXT(" Making partial bunch from content bunch. bitsThisBunch: %d bitsLeft: %d"), bitsThisBunch, bitsLeft );
ensure(bitsLeft == 0 || bitsThisBunch % 8 == 0); // Byte aligned or it was the last bunch
}
}
else
{
OutgoingBunches.Add(Bunch);
}
// 省略后续代码
}
进行拆包的时候,会创建一个或多个PartialBunch,每个PartialBunch都会从当前Bunch里拿出最多MAX_PARTIAL_BUNCH_SIZE_BITS个bit的数据来执行填充,然后每个被填充好的PartialBunch都会以此添加到OutgoingBunches数组里。
完成Bunch的合并与拆分之后,会遍历OutgoingBunches来设置好每个可能的PartialBunch的相关属性字段,主要是处理bPartial/bPartialInitial/bPartialFinal这三个字段,bPartial代表当前包是拆分包,bPartialInitial代表当前包是拆分包里的第一个,bPartialFinal代表当前包是拆分包里的最后一个。
UE_CLOG((OutgoingBunches.Num() > 1), LogNetPartialBunch, Log, TEXT("Sending %d Bunches. Channel: %d %s"), OutgoingBunches.Num(), Bunch->ChIndex, *Describe());
for( int32 PartialNum = 0; PartialNum < OutgoingBunches.Num(); ++PartialNum)
{
FOutBunch * NextBunch = OutgoingBunches[PartialNum];
NextBunch->bReliable = Bunch->bReliable;
NextBunch->bOpen = Bunch->bOpen;
NextBunch->bClose = Bunch->bClose;
NextBunch->CloseReason = Bunch->CloseReason;
NextBunch->bIsReplicationPaused = Bunch->bIsReplicationPaused;
NextBunch->ChIndex = Bunch->ChIndex;
NextBunch->ChName = Bunch->ChName;
if ( !NextBunch->bHasPackageMapExports )
{
NextBunch->bHasMustBeMappedGUIDs |= Bunch->bHasMustBeMappedGUIDs;
}
if (OutgoingBunches.Num() > 1)
{
NextBunch->bPartial = 1;
NextBunch->bPartialInitial = (PartialNum == 0 ? 1: 0);
NextBunch->bPartialFinal = (PartialNum == OutgoingBunches.Num() - 1 ? 1: 0);
NextBunch->bOpen &= (PartialNum == 0); // Only the first bunch should have the bOpen bit set
NextBunch->bClose = (Bunch->bClose && (OutgoingBunches.Num()-1 == PartialNum)); // Only last bunch should have bClose bit set
}
FOutBunch *ThisOutBunch = PrepBunch(NextBunch, OutBunch, Merge); // This handles queuing reliable bunches into the ack list
// Update Packet Range
int32 PacketId = SendRawBunch(ThisOutBunch, Merge, GetTraceCollector(*NextBunch));
if (PartialNum == 0)
{
PacketIdRange = FPacketIdRange(PacketId);
}
else
{
PacketIdRange.Last = PacketId;
}
// Update channel sequence count.
Connection->LastOut = *ThisOutBunch;
Connection->LastEnd = FBitWriterMark( Connection->SendBuffer );
}
如果当前包是一个需要可靠传输的包,那么上面的PrepBunch会为这个Bunch分配一个在Channel内递增的序列号ChSequence,同时将这个Bunch记录在当前可靠Bunch列表OutRec的末尾:
// OUtbunch is a bunch that was new'd by the network system or NULL. It should never be one created on the stack
FOutBunch* UChannel::PrepBunch(FOutBunch* Bunch, FOutBunch* OutBunch, bool Merge)
{
if ( Connection->ResendAllDataState != EResendAllDataState::None )
{
return Bunch;
}
// Find outgoing bunch index.
if( Bunch->bReliable )
{
// Find spot, which was guaranteed available by FOutBunch constructor.
if( OutBunch==NULL )
{
Bunch->Next = NULL;
Bunch->ChSequence = ++Connection->OutReliable[ChIndex];
NumOutRec++;
OutBunch = new FOutBunch(*Bunch);
FOutBunch** OutLink = &OutRec;
while(*OutLink) // This was rewritten from a single-line for loop due to compiler complaining about empty body for loops (-Wempty-body)
{
OutLink=&(*OutLink)->Next;
}
*OutLink = OutBunch;
}
else
{
Bunch->Next = OutBunch->Next;
*OutBunch = *Bunch;
}
Connection->LastOutBunch = OutBunch;
}
else
{
OutBunch = Bunch;
Connection->LastOutBunch = NULL;//warning: Complex code, don't mess with this!
}
return OutBunch;
}
执行完PrepBunch,紧接着就使用SendRawBunch将当前Bunch转化为Packet加入到发送队列里,并生成一个PacketId进行返回:
int32 UChannel::SendRawBunch(FOutBunch* OutBunch, bool Merge, const FNetTraceCollector* Collector)
{
// Sending for checkpoints may need to send an open bunch if the actor went dormant, so allow the OpenPacketId to be set
// Send the raw bunch.
OutBunch->ReceivedAck = 0;
int32 PacketId = Connection->SendRawBunch(*OutBunch, Merge, Collector);
if( OpenPacketId.First==INDEX_NONE && OpenedLocally )
{
OpenPacketId = FPacketIdRange(PacketId);
}
if( OutBunch->bClose )
{
SetClosingFlag();
}
return PacketId;
}
这里会调用UNetConnection::SendRawBunch,这个函数负责将Bunch数据转换为真正在网络上执行发送的Packet数据,也就是二进制流:
int32 UNetConnection::SendRawBunch(FOutBunch& Bunch, bool InAllowMerge, const FNetTraceCollector* BunchCollector)
{
ValidateSendBuffer();
check(!Bunch.ReceivedAck);
check(!Bunch.IsError());
Driver->OutBunches++;
Driver->OutTotalBunches++;
TimeSensitive = 1;
// Build header.
SendBunchHeader.Reset();
const bool bIsOpenOrClose = Bunch.bOpen || Bunch.bClose;
const bool bIsOpenOrReliable = Bunch.bOpen || Bunch.bReliable;
SendBunchHeader.WriteBit(bIsOpenOrClose);
if (bIsOpenOrClose)
{
SendBunchHeader.WriteBit(Bunch.bOpen);
SendBunchHeader.WriteBit(Bunch.bClose);
if (Bunch.bClose)
{
uint32 Value = (uint32)Bunch.CloseReason;
SendBunchHeader.SerializeInt(Value, (uint32)EChannelCloseReason::MAX);
}
}
SendBunchHeader.WriteBit(Bunch.bIsReplicationPaused);
SendBunchHeader.WriteBit(Bunch.bReliable);
uint32 ChIndex = Bunch.ChIndex;
SendBunchHeader.SerializeIntPacked(ChIndex);
SendBunchHeader.WriteBit(Bunch.bHasPackageMapExports);
SendBunchHeader.WriteBit(Bunch.bHasMustBeMappedGUIDs);
SendBunchHeader.WriteBit(Bunch.bPartial);
if (Bunch.bReliable && !IsInternalAck())
{
SendBunchHeader.WriteIntWrapped(Bunch.ChSequence, MAX_CHSEQUENCE);
}
if (Bunch.bPartial)
{
SendBunchHeader.WriteBit(Bunch.bPartialInitial);
SendBunchHeader.WriteBit(Bunch.bPartialFinal);
}
if (bIsOpenOrReliable)
{
UPackageMap::StaticSerializeName(SendBunchHeader, Bunch.ChName);
}
SendBunchHeader.WriteIntWrapped(Bunch.GetNumBits(), UNetConnection::MaxPacket * 8);
check(!SendBunchHeader.IsError());
// Remember start position.
AllowMerge = InAllowMerge;
Bunch.Time = Driver->GetElapsedTime();
NETWORK_PROFILER(GNetworkProfiler.PushSendBunch(this, &Bunch, SendBunchHeader.GetNumBits(), Bunch.GetNumBits()));
const int32 BunchHeaderBits = SendBunchHeader.GetNumBits();
const int32 BunchBits = Bunch.GetNumBits();
// If the bunch does not fit in the current packet,
// flush packet now so that we can report collected stats in the correct scope
PrepareWriteBitsToSendBuffer(BunchHeaderBits, BunchBits);
// Report bunch
UE_NET_TRACE_END_BUNCH(OutTraceCollector, Bunch, Bunch.ChName, 0, BunchHeaderBits, BunchBits, BunchCollector);
// Write the bits to the buffer and remember the packet id used
Bunch.PacketId = WriteBitsToSendBufferInternal(SendBunchHeader.GetData(), BunchHeaderBits, Bunch.GetData(), BunchBits, EWriteBitsDataType::Bunch);
// Track channels that wrote data to this packet.
FChannelRecordImpl::PushChannelRecord(ChannelRecord, Bunch.PacketId, Bunch.ChIndex);
// 忽略一些PackageMapClient相关的逻辑
if (bAutoFlush)
{
FlushNet();
}
return Bunch.PacketId;
}
这里会使用SendBunchHeader这个结构来填充当前Bunch的一些元数据信息,开头的PrepareWriteBitsToSendBuffer负责先通过WritePacketHeader和WriteDummyPacketInfo在SendBuffer里写入一些无效的数据来执行占位,这些数据会在后续用最终的值来重新覆盖:
void UNetConnection::PrepareWriteBitsToSendBuffer(const int32 SizeInBits, const int32 ExtraSizeInBits)
{
ValidateSendBuffer();
#if !UE_BUILD_SHIPPING
// Now that the stateless handshake is responsible for initializing the packet sequence numbers,
// we can't allow any packets to be written to the send buffer until after this has completed
if (CVarRandomizeSequence.GetValueOnAnyThread() > 0)
{
checkf(!Handler.IsValid() || Handler->IsFullyInitialized(), TEXT("Attempted to write to send buffer before packet handler was fully initialized. Connection: %s"), *Describe());
}
#endif
const int32 TotalSizeInBits = SizeInBits + ExtraSizeInBits;
// Flush if we can't add to current buffer
if ( TotalSizeInBits > GetFreeSendBufferBits() )
{
FlushNet();
}
// If this is the start of the queue, make sure to add the packet id
if ( SendBuffer.GetNumBits() == 0 && !IsInternalAck() )
{
// Write Packet Header, before sending the packet we will go back and rewrite the data
WritePacketHeader(SendBuffer);
// Pre-write the bits for the packet info
WriteDummyPacketInfo(SendBuffer);
// We do not allow the first bunch to merge with the ack data as this will "revert" the ack data.
AllowMerge = false;
// Update stats for PacketIdBits and ackdata (also including the data used for packet RTT and saturation calculations)
int64 BitsWritten = SendBuffer.GetNumBits();
NumPacketIdBits += FNetPacketNotify::SequenceNumberT::SeqNumberBits;
NumAckBits += BitsWritten - FNetPacketNotify::SequenceNumberT::SeqNumberBits;
// Report stats to profiler
NETWORK_PROFILER( GNetworkProfiler.TrackSendAck( NumAckBits, this ) );
ValidateSendBuffer();
}
}
占据好头部空间之后,再使用WriteBitsToSendBufferInternal将SendBunchHeader的内容与Bunch的内容组合起来,放到当前UNetConnection的SendBuffer的后面:
int32 UNetConnection::WriteBitsToSendBufferInternal(
const uint8 * Bits,
const int32 SizeInBits,
const uint8 * ExtraBits,
const int32 ExtraSizeInBits,
EWriteBitsDataType DataType)
{
// Remember start position in case we want to undo this write, no meaning to undo the header write as this is only used to pop bunches and the header should not count towards the bunch
// Store this after the possible flush above so we have the correct start position in the case that we do flush
LastStart = FBitWriterMark( SendBuffer );
// Add the bits to the queue
if ( SizeInBits )
{
SendBuffer.SerializeBits( const_cast< uint8* >( Bits ), SizeInBits );
ValidateSendBuffer();
}
// Add any extra bits
if ( ExtraSizeInBits )
{
SendBuffer.SerializeBits( const_cast< uint8* >( ExtraBits ), ExtraSizeInBits );
ValidateSendBuffer();
}
const int32 RememberedPacketId = OutPacketId;
switch ( DataType )
{
case EWriteBitsDataType::Bunch:
NumBunchBits += SizeInBits + ExtraSizeInBits;
break;
default:
break;
}
// Flush now if we are full
if (GetFreeSendBufferBits() == 0
#if !UE_BUILD_SHIPPING
|| CVarForceNetFlush.GetValueOnAnyThread() != 0
#endif
)
{
FlushNet();
}
return RememberedPacketId;
}
由于MTU的限制,SendBuffer不能无限制的加入数据,所以这个SendBuffer也会以当前MaxPacket=1024的大小来初始化内部的Buffer:
void UNetConnection::InitSendBuffer()
{
check(MaxPacket > 0);
int32 FinalBufferSize = (MaxPacket * 8) - MaxPacketHandlerBits;
// Initialize the one outgoing buffer.
if (FinalBufferSize == SendBuffer.GetMaxBits())
{
// Reset all of our values to their initial state without a malloc/free
SendBuffer.Reset();
}
else
{
// First time initialization needs to allocate the buffer
SendBuffer = FBitWriter(FinalBufferSize);
}
HeaderMarkForPacketInfo.Reset();
ResetPacketBitCounts();
ValidateSendBuffer();
}
如果发现加入了一个Bunch之后剩余空间已经无法再放入新的Bunch,那么这里会强制调用FlushNet,将当前的SendBuffer里的数据发送出去。FlushNet的内部逻辑可以大概精简为下面的代码:
void UNetConnection::FlushNet(bool bIgnoreSimulation)
{
check(Driver);
// Update info.
ValidateSendBuffer();
LastEnd = FBitWriterMark();
TimeSensitive = 0;
const double PacketSentTimeInS = FPlatformTime::Seconds();
// Write the UNetConnection-level termination bit
SendBuffer.WriteBit(1);
// Refresh outgoing header with latest data
if ( !IsInternalAck() )
{
// if we update ack, we also update received ack associated with outgoing seq
// so we know how many ack bits we need to write (which is updated in received packet)
WritePacketHeader(SendBuffer);
WriteFinalPacketInfo(SendBuffer, PacketSentTimeInS);
}
// Send now.
// Checked in FlushNet() so each child class doesn't have to implement this
if (Driver->IsNetResourceValid())
{
LowLevelSend(SendBuffer.GetData(), SendBuffer.GetNumBits(), Traits);
}
// Update stuff.
const int32 Index = OutPacketId & (UE_ARRAY_COUNT(OutLagPacketId)-1);
// Remember the actual time this packet was sent out, so we can compute ping when the ack comes back
OutLagPacketId[Index] = OutPacketId;
OutLagTime[Index] = PacketSentTimeInS;
OutBytesPerSecondHistory[Index] = FMath::Min(OutBytesPerSecond / 1024, 255);
// Increase outgoing sequence number
if (!IsInternalAck())
{
PacketNotify.CommitAndIncrementOutSeq();
}
// Make sure that we always push an ChannelRecordEntry for each transmitted packet even if it is empty
FChannelRecordImpl::PushPacketId(ChannelRecord, OutPacketId);
++OutPackets;
++OutTotalPackets;
Driver->OutPackets++;
Driver->OutTotalPackets++;
//Record the first packet time in the histogram
if (!bFlushedNetThisFrame)
{
double LastPacketTimeDiffInMs = (Driver->GetElapsedTime() - LastSendTime) * 1000.0;
NetConnectionHistogram.AddMeasurement(LastPacketTimeDiffInMs);
}
LastSendTime = Driver->GetElapsedTime();
++OutPacketId;
}
这里的SendBuffer.WriteBit(1)负责在SendBuffer的末尾添加一个值为1的bit,作为当前Packet的终止符。由于UDP包都是按照字节传输的,所以在SendBuffer.WriteBit(1)之后,还会在SendBuffer的末尾添加0个或者多个bit,直到SendBuffer的总bit数是8的倍数。
这里的WritePacketHeader负责添加可靠传输的序列号相关信息,这个函数的细节将在后续的可靠传输中介绍。而WriteFinalPacketInfo负责添加服务器与客户端之间的网络延迟信息,数据量为20bit,其实就是在发包的时候带上时间戳,并记录在本地的OutLagTime数组里。
这两个函数写入的数据都会在当前SendBuffer的头部,刚好对应之前PrepareWriteBitsToSendBuffer在SendBuffer里占据好的位置。同时这里还会在SendBuffer的末尾添加一个值为1的bit,作为当前Packet的终止符。这两个函数都执行完之后SendBuffer里就有最终的二进制,整个Packet的格式如下:

最后使用LowLevelSend将SendBuffer将数据发送出去, 这里的LowLevelSend最终会调用到平台相关的Socket::SendTo,在Windows平台就是FSocketWindows,在Linux平台就是FSocketBSD:
void UIpConnection::LowLevelSend(void* Data, int32 CountBits, FOutPacketTraits& Traits)
{
// 省略很多分支条件
// Send to remote.
FSocketSendResult SendResult;
CLOCK_CYCLES(Driver->SendCycles);
if ( CountBytes > MaxPacket )
{
UE_LOG( LogNet, Warning, TEXT( "UIpConnection::LowLevelSend: CountBytes > MaxPacketSize! Count: %i, MaxPacket: %i %s" ), CountBytes, MaxPacket, *Describe() );
}
FPacketAudit::NotifyLowLevelSend((uint8*)DataToSend, CountBytes, CountBits);
if (CountBytes > 0)
{
const bool bNotifyOnSuccess = (SocketErrorDisconnectDelay > 0.f) && (SocketError_SendDelayStartTime != 0.f);
FSocket* CurSocket = GetSocket();
if (CVarNetIpConnectionUseSendTasks.GetValueOnAnyThread() != 0)
{
DECLARE_CYCLE_STAT(TEXT("IpConnection SendTo task"), STAT_IpConnection_SendToTask, STATGROUP_TaskGraphTasks);
FGraphEventArray Prerequisites;
if (LastSendTask.IsValid())
{
Prerequisites.Add(LastSendTask);
}
ISocketSubsystem* const SocketSubsystem = Driver->GetSocketSubsystem();
LastSendTask = FFunctionGraphTask::CreateAndDispatchWhenReady([this, Packet = TArray<uint8>(DataToSend, CountBytes), SocketSubsystem, bNotifyOnSuccess]
{
FSocket* CurSocket = GetSocket();
if (CurSocket != nullptr)
{
bool bWasSendSuccessful = false;
UIpConnection::FSocketSendResult Result;
{
SCOPE_CYCLE_COUNTER(STAT_IpConnection_SendToSocket);
bWasSendSuccessful = CurSocket->SendTo(Packet.GetData(), Packet.Num(), Result.BytesSent, *RemoteAddr);
}
}
},
GET_STATID(STAT_IpConnection_SendToTask), &Prerequisites);
}
}
}
这里的SendTo依然是对平台的网络接口的封装,只有具体的平台子类里才能调用到最终的操作系统API:
bool FSocketBSD::SendTo(const uint8* Data, int32 Count, int32& BytesSent, const FInternetAddr& Destination)
{
// TODO: Consider converting IPv4 addresses to v6 when needed
if (Destination.GetProtocolType() != GetProtocol())
{
UE_LOG(LogSockets, Warning, TEXT("Destination protocol of '%s' does not match protocol: '%s' for address: '%s'"),
*Destination.GetProtocolType().ToString(), *GetProtocol().ToString(), *Destination.ToString(true));
return false;
}
const FInternetAddrBSD& BSDAddr = static_cast<const FInternetAddrBSD&>(Destination);
// Write the data and see how much was written
BytesSent = sendto(Socket, (const char*)Data, Count, SendFlags, (const sockaddr*)&(BSDAddr.Addr), BSDAddr.GetStorageSize());
// NETWORK_PROFILER(FSocket::SendTo(Data,Count,BytesSent,Destination));
bool Result = BytesSent >= 0;
if (Result)
{
LastActivityTime = FPlatformTime::Seconds();
}
return Result;
}
/**
* Implements a Windows/BSD network socket.
*/
class FSocketWindows
: public FSocketBSD
{
public:
FSocketWindows(SOCKET InSocket, ESocketType InSocketType, const FString& InSocketDescription, const FName& InSocketProtocol, ISocketSubsystem* InSubsystem)
: FSocketBSD(InSocket, InSocketType, InSocketDescription, InSocketProtocol, InSubsystem)
{ }
// FSocketBSD overrides
virtual bool Shutdown(ESocketShutdownMode Mode) override;
virtual bool SetIpPktInfo(bool bEnable) override;
virtual bool RecvFromWithPktInfo(uint8* Data, int32 BufferSize, int32& BytesRead, FInternetAddr& Source, FInternetAddr& Destination, ESocketReceiveFlags::Type Flags = ESocketReceiveFlags::None) override;
protected:
LPFN_WSARECVMSG WSARecvMsg = nullptr;
};
注意到这里为了尽可能的维持统一的网路IO模型,网络底层使用的都是最基础的IO接口,而没有去使用IO多路复用以及IO完成端口等高级特性。因为目前UE的设计里单服务器的客户端连接一般不会超过200,所以最简单的同步通信模型也能满足需求。
消息接收
消息接收由Tick驱动,当World::Tick的时候,会出发对应的NetDriver::TickDispatch:
void UNetDriver::RegisterTickEvents(class UWorld* InWorld)
{
if (InWorld)
{
TickDispatchDelegateHandle = InWorld->OnTickDispatch ().AddUObject(this, &UNetDriver::TickDispatch);
PostTickDispatchDelegateHandle = InWorld->OnPostTickDispatch().AddUObject(this, &UNetDriver::PostTickDispatch);
TickFlushDelegateHandle = InWorld->OnTickFlush ().AddUObject(this, &UNetDriver::TickFlush);
PostTickFlushDelegateHandle = InWorld->OnPostTickFlush ().AddUObject(this, &UNetDriver::PostTickFlush);
}
}
在这个TickDispatch里,会使用一个迭代器来访问当前已经接收到的数据:
void UIpNetDriver::TickDispatch(float DeltaTime)
{
LLM_SCOPE(ELLMTag::Networking);
Super::TickDispatch( DeltaTime );
#if !UE_BUILD_SHIPPING
PauseReceiveEnd = (PauseReceiveEnd != 0.f && PauseReceiveEnd - (float)FPlatformTime::Seconds() > 0.f) ? PauseReceiveEnd : 0.f;
if (PauseReceiveEnd != 0.f)
{
return;
}
#endif
// Set the context on the world for this driver's level collection.
const int32 FoundCollectionIndex = World ? World->GetLevelCollections().IndexOfByPredicate([this](const FLevelCollection& Collection)
{
return Collection.GetNetDriver() == this;
}) : INDEX_NONE;
FScopedLevelCollectionContextSwitch LCSwitch(FoundCollectionIndex, World);
DDoS.PreFrameReceive(DeltaTime);
ISocketSubsystem* SocketSubsystem = GetSocketSubsystem();
bool bRetrieveTimestamps = CVarNetUseRecvTimestamps.GetValueOnAnyThread() != 0;
// Process all incoming packets
for (FPacketIterator It(this); It; ++It)
{
FReceivedPacketView ReceivedPacket;
FInPacketTraits& ReceivedTraits = ReceivedPacket.Traits;
bool bOk = It.GetCurrentPacket(ReceivedPacket);
const TSharedRef<const FInternetAddr> FromAddr = ReceivedPacket.Address.ToSharedRef();
UNetConnection* Connection = nullptr;
UIpConnection* const MyServerConnection = GetServerConnection();
// 暂时省略Packet的处理逻辑
}
}
在这个FPacketIterator的构造函数和迭代器函数里会使用AdvanceCurrentPacket来获取下一个Packet:
FPacketIterator(UIpNetDriver* InDriver, FRecvMulti* InRMState, double InStartReceiveTime, bool bInCheckReceiveTime)
{
if (!bUseRecvMulti && SocketSubsystem != nullptr)
{
CurrentPacket.Address = SocketSubsystem->CreateInternetAddr();
}
AdvanceCurrentPacket();
}
FORCEINLINE FPacketIterator& operator++()
{
IterationCount++;
AdvanceCurrentPacket();
return *this;
}
/**
* Advances the current packet to the next iteration
*/
void AdvanceCurrentPacket()
{
// 省略很多代码
if (bUseRecvMulti)
{
// 忽略多线程接收数据
// At this point, bBreak will be set, or RecvMultiPacketCount will be > 0
}
else
{
bBreak = !ReceiveSinglePacket();
}
}
AdvanceCurrentPacket内部会根据是否开启了多线程接收数据来执行不同的逻辑,其多线程开关为bUseRecvMulti,默认情况下这个开关是关的,我们只关心在主线程里接收数据的情况:
/**
* Receives a single packet from the network socket, outputting to the CurrentPacket buffer.
*
* @return Whether or not a packet or an error was successfully received
*/
bool ReceiveSinglePacket()
{
bool bReceivedPacketOrError = false;
CurrentPacket.bRecvSuccess = false;
CurrentPacket.Data.SetNumUninitialized(0, false);
if (CurrentPacket.Address.IsValid())
{
CurrentPacket.Address->SetAnyAddress();
}
CurrentPacket.PacketTimestamp = 0.0;
CurrentPacket.Error = SE_NO_ERROR;
while (true)
{
bReceivedPacketOrError = false;
if (SocketReceiveThreadRunnable != nullptr)
{
// 省略多线程接收数据的部分
}
else if (Driver->GetSocket() != nullptr && SocketSubsystem != nullptr)
{
SCOPE_CYCLE_COUNTER(STAT_IpNetDriver_RecvFromSocket);
int32 BytesRead = 0;
bool bReceivedPacket = Driver->GetSocket()->RecvFrom(CurrentPacket.Data.GetData(), MAX_PACKET_SIZE, BytesRead, *CurrentPacket.Address);
CurrentPacket.bRecvSuccess = bReceivedPacket;
bReceivedPacketOrError = bReceivedPacket;
if (bReceivedPacket)
{
// Fixed allocator, so no risk of realloc from copy-then-resize
CurrentPacket.Data.SetNumUninitialized(BytesRead, false);
}
else
{
// 忽略错误处理代码
}
}
// While loop only exists to allow 'continue' for DDoS and invalid packet code, above
break;
}
return bReceivedPacketOrError;
}
在ReceiveSinglePacket内部会通过Driver->GetSocket()->RecvFrom去从UDP端口里接收一个Packet到CurrentPacket.Data中。外部再使用GetCurrentPacket来尝试获取接收到的Packet的DataView:
/**
* Retrieves the packet information from the current iteration. Avoid calling more than once, per iteration.
*
* @param OutPacket Outputs a view to the received packet data
* @return Returns whether or not receiving was successful for the current packet
*/
bool GetCurrentPacket(FReceivedPacketView& OutPacket)
{
bool bRecvSuccess = false;
if (bUseRecvMulti)
{
RMState->GetPacket(RecvMultiIdx, OutPacket);
bRecvSuccess = true;
}
else
{
OutPacket.DataView = {CurrentPacket.Data.GetData(), CurrentPacket.Data.Num(), ECountUnits::Bytes};
OutPacket.Error = CurrentPacket.Error;
OutPacket.Address = CurrentPacket.Address;
bRecvSuccess = CurrentPacket.bRecvSuccess;
}
return bRecvSuccess;
}
当接收到一个有效的FReceivedPacketView之后,会通知到对应的Connection里去执行OnReceiveRawPacket函数。这里寻找对应的Connection的过程很简单,如果当前是客户端则直接使用对应的服务端连接MyServerConnection,如果是服务端则通过消息的来源地址去查找MappedClientConnections:
// Figure out which socket the received data came from.
if (MyServerConnection)
{
if (MyServerConnection->RemoteAddr->CompareEndpoints(*FromAddr))
{
Connection = MyServerConnection;
}
else
{
UE_LOG(LogNet, Warning, TEXT("Incoming ip address doesn't match expected server address: Actual: %s Expected: %s"),
*FromAddr->ToString(true),
MyServerConnection->RemoteAddr.IsValid() ? *MyServerConnection->RemoteAddr->ToString(true) : TEXT("Invalid"));
}
}
if (Connection == nullptr)
{
UNetConnection** Result = MappedClientConnections.Find(FromAddr);
if (Result != nullptr)
{
UNetConnection* ConnVal = *Result;
if (ConnVal != nullptr)
{
Connection = ConnVal;
}
else
{
ReceivedTraits.bFromRecentlyDisconnected = true;
}
}
check(Connection == nullptr || CastChecked<UIpConnection>(Connection)->RemoteAddr->CompareEndpoints(*FromAddr));
}
bool bIgnorePacket = false;
// If we didn't find a client connection, maybe create a new one.
if (Connection == nullptr)
{
// 忽略创建新连接的代码
}
// Send the packet to the connection for processing.
if (Connection != nullptr && !bIgnorePacket)
{
if (bRetrieveTimestamps)
{
It.GetCurrentPacketTimestamp(Connection);
}
Connection->ReceivedRawPacket((uint8*)ReceivedPacket.DataView.GetData(), ReceivedPacket.DataView.NumBytes());
}
在UNetConnection::ReceivedRawPacket首先会使用Handler来对接收到的数据做一遍处理,这里可能会有一些解密解压缩相关的流程:
void UNetConnection::ReceivedRawPacket( void* InData, int32 Count )
{
#if !UE_BUILD_SHIPPING
// Add an opportunity for the hook to block further processing
bool bBlockReceive = false;
ReceivedRawPacketDel.ExecuteIfBound(InData, Count, bBlockReceive);
if (bBlockReceive)
{
return;
}
#endif
#if DO_ENABLE_NET_TEST
// Opportunity for packet loss burst simulation to drop the incoming packet.
if (Driver && Driver->IsSimulatingPacketLossBurst())
{
return;
}
#endif
uint8* Data = (uint8*)InData;
if (Handler.IsValid())
{
const ProcessedPacket UnProcessedPacket = Handler->Incoming(Data, Count);
if (!UnProcessedPacket.bError)
{
Count = FMath::DivideAndRoundUp(UnProcessedPacket.CountBits, 8);
if (Count > 0)
{
Data = UnProcessedPacket.Data;
}
// This packed has been consumed
else
{
return;
}
}
}
// Handle an incoming raw packet from the driver.
UE_LOG(LogNetTraffic, Verbose, TEXT("%6.3f: Received %i"), FPlatformTime::Seconds() - GStartTime, Count );
int32 PacketBytes = Count + PacketOverhead;
InBytes += PacketBytes;
InTotalBytes += PacketBytes;
++InPackets;
++InTotalPackets;
if (Driver)
{
Driver->InBytes += PacketBytes;
Driver->InTotalBytes += PacketBytes;
Driver->InPackets++;
Driver->InTotalPackets++;
}
if (Count > 0)
{
uint8 LastByte = Data[Count-1];
if (LastByte != 0)
{
int32 BitSize = (Count * 8) - 1;
// Bit streaming, starts at the Least Significant Bit, and ends at the MSB.
while (!(LastByte & 0x80))
{
LastByte *= 2;
BitSize--;
}
FBitReader Reader(Data, BitSize);
// Set the network version on the reader
Reader.SetEngineNetVer( EngineNetworkProtocolVersion );
Reader.SetGameNetVer( GameNetworkProtocolVersion );
if (Handler.IsValid())
{
Handler->IncomingHigh(Reader);
}
if (Reader.GetBitsLeft() > 0)
{
ReceivedPacket(Reader);
// Check if the out of order packet cache needs flushing
FlushPacketOrderCache();
}
}
}
}
在Handle处理完之后,还有一道非常重要的工序,即寻找当前Packet的最后一个Bit。在前面的消息发送部分我们提到过UE会在Packet的末尾添加一个值为1的bit,但是UDP发送数据的时候数据的粒度是字节,所以如果SendBuffer无法组成完整字节的情况下,会在这个bit之后用0去填充数据。所以在接收到以字节为单位的数据的时候,需要重新寻找到最后的1的bit作为边界,也就是上面操作LastByte的While循环部分的逻辑,用来计算真正的数据bit大小BitSize。
获取了真正有效的数据之后,会以这个数据构造出FBitReader,然后调用ReceivePacket去处理。在ReceivePacket的开头会首先通过PacketNotify.ReadHeader将packet的头部数据读取出来,也就是FNotificationHeader部分:
// Read packet header
FNetPacketNotify::FNotificationHeader Header;
if (!PacketNotify.ReadHeader(Header, Reader))
{
CLOSE_CONNECTION_DUE_TO_SECURITY_VIOLATION(this, ESecurityEvent::Malformed_Packet, TEXT("Failed to read PacketHeader"));
return;
}
// 暂时省略一些处理乱序接收的代码
// Extra information associated with the header (read only after acks have been processed)
if (PacketSequenceDelta > 0 && !ReadPacketInfo(Reader, bHasPacketInfoPayload))
{
CLOSE_CONNECTION_DUE_TO_SECURITY_VIOLATION(this, ESecurityEvent::Malformed_Packet, TEXT("Failed to read PacketHeader"));
return;
}
读取完这个FNotificationHeader头部数据之后,会利用这个头部里携带的序列号信息来判断当前Packet是否乱序,如果是乱序包可能会不处理直接return。这里我们先不去关注有序接收Packet的细节,先假设我们当前的Packet就是下一个需要的Packet。
确认是有序Packet被接收之后, 后面的ReadPacketInfo负责读取之前发包的时候通过WriteFinalPacketInfo写入的一些时间戳信息, 根据本地时间与OutLagTime里记录的发包时间来计算网络延迟RTT,从而去更新Ping值,
bool UNetConnection::ReadPacketInfo(FBitReader& Reader, bool bHasPacketInfoPayload)
{
// 省略解析PacketInfo的代码
// Update ping
// At this time we have updated OutAckPacketId to the latest received ack.
const int32 Index = OutAckPacketId & (UE_ARRAY_COUNT(OutLagPacketId)-1);
if ( OutLagPacketId[Index] == OutAckPacketId )
{
OutLagPacketId[Index] = -1; // Only use the ack once
double PacketReceiveTime = 0.0;
FTimespan& RecvTimespan = LastOSReceiveTime.Timestamp;
if (!RecvTimespan.IsZero() && Driver != nullptr && CVarPingUsePacketRecvTime.GetValueOnAnyThread())
{
if (bIsOSReceiveTimeLocal)
{
PacketReceiveTime = RecvTimespan.GetTotalSeconds();
}
else if (ISocketSubsystem* SocketSubsystem = Driver->GetSocketSubsystem())
{
PacketReceiveTime = SocketSubsystem->TranslatePacketTimestamp(LastOSReceiveTime);
}
}
// use FApp's time because it is set closer to the beginning of the frame - we don't care about the time so far of the current frame to process the packet
const double CurrentTime = (PacketReceiveTime != 0.0 ? PacketReceiveTime : FApp::GetCurrentTime());
const double RTT = (CurrentTime - OutLagTime[Index] ) - ( CVarPingExcludeFrameTime.GetValueOnAnyThread() ? ServerFrameTime : 0.0 );
const double NewLag = FMath::Max( RTT, 0.0 );
//UE_LOG( LogNet, Warning, TEXT( "Out: %i, InRemote: %i, Saturation: %f" ), OutBytesPerSecondHistory[Index], RemoteInKBytesPerSecond, RemoteSaturation );
LagAcc += NewLag;
LagCount++;
if (PlayerController)
{
PlayerController->UpdatePing(NewLag);
}
if (NetworkCongestionControl.IsSet())
{
NetworkCongestionControl.GetValue().OnAck({ CurrentTime, OutAckPacketId });
}
}
}
上述内容就是解析并处理Packet头部信息的相关逻辑,当判断了当前Packet是非乱序Packet之后,接下来就是处理其内部的Bunch数据,由于一个Packet内可能有零个或者一到多个Bunch存在,因此这里使用的是While循环,每次解析出来一个Bunch之后都会通知对应的Channel来执行ReceivedRawBunch:
// Disassemble and dispatch all bunches in the packet.
while( !Reader.AtEnd() && State!=USOCK_Closed )
{
// For demo backwards compatibility, old replays still have this bit
if (IsInternalAck() && EngineNetworkProtocolVersion < EEngineNetworkVersionHistory::HISTORY_ACKS_INCLUDED_IN_HEADER)
{
const bool IsAckDummy = Reader.ReadBit() == 1u;
}
// Parse the bunch.
int32 StartPos = Reader.GetPosBits();
// Process Received data
{
// Parse the incoming data.
FInBunch Bunch( this );
int32 IncomingStartPos = Reader.GetPosBits();
uint8 bControl = Reader.ReadBit();
Bunch.PacketId = InPacketId;
Bunch.bOpen = bControl ? Reader.ReadBit() : 0;
Bunch.bClose = bControl ? Reader.ReadBit() : 0;
if (Bunch.EngineNetVer() < HISTORY_CHANNEL_CLOSE_REASON)
{
const uint8 bDormant = Bunch.bClose ? Reader.ReadBit() : 0;
Bunch.CloseReason = bDormant ? EChannelCloseReason::Dormancy : EChannelCloseReason::Destroyed;
}
else
{
Bunch.CloseReason = Bunch.bClose ? (EChannelCloseReason)Reader.ReadInt((uint32)EChannelCloseReason::MAX) : EChannelCloseReason::Destroyed;
}
Bunch.bIsReplicationPaused = Reader.ReadBit();
Bunch.bReliable = Reader.ReadBit();
if (Bunch.EngineNetVer() < HISTORY_MAX_ACTOR_CHANNELS_CUSTOMIZATION)
{
static const int OLD_MAX_ACTOR_CHANNELS = 10240;
Bunch.ChIndex = Reader.ReadInt(OLD_MAX_ACTOR_CHANNELS);
}
else
{
uint32 ChIndex;
Reader.SerializeIntPacked(ChIndex);
if (ChIndex >= (uint32)MaxChannelSize)
{
CLOSE_CONNECTION_DUE_TO_SECURITY_VIOLATION(this, ESecurityEvent::Malformed_Packet, TEXT("Bunch channel index exceeds channel limit"));
return;
}
Bunch.ChIndex = ChIndex;
}
UChannel* Channel = Channels[Bunch.ChIndex];
// 省略后续的解析bunch逻辑
// Dispatch the raw, unsequenced bunch to the channel.
bool bLocalSkipAck = false;
Channel->ReceivedRawBunch( Bunch, bLocalSkipAck ); //warning: May destroy channel.
if ( bLocalSkipAck )
{
bSkipAck = true;
}
Driver->InBunches++;
Driver->InTotalBunches++;
}
}
这里获取对应的Channel的时候使用的是Bunch里的ChIndex,同一个Channel在服务器客户端的ChIndex都是一样的。
在UChannel::ReceivedRawBunch会根据当前Bunch是否是可靠消息来做不同的处理,这里我们先忽略掉这个可靠消息接收的部分,重点来看这里是如何处理小包的拆包和大包的合并的:
void UChannel::ReceivedRawBunch( FInBunch & Bunch, bool & bOutSkipAck )
{
SCOPE_CYCLE_COUNTER(Stat_ChannelReceivedRawBunch);
SCOPED_NAMED_EVENT(UChannel_ReceivedRawBunch, FColor::Green);
// Immediately consume the NetGUID portion of this bunch, regardless if it is partial or reliable.
// NOTE - For replays, we do this even earlier, to try and load this as soon as possible, in case there is an issue creating the channel
// If a replay fails to create a channel, we want to salvage as much as possible
if ( Bunch.bHasPackageMapExports && !Connection->IsInternalAck() )
{
Cast<UPackageMapClient>( Connection->PackageMap )->ReceiveNetGUIDBunch( Bunch );
if ( Bunch.IsError() )
{
UE_LOG( LogNetTraffic, Error, TEXT( "UChannel::ReceivedRawBunch: Bunch.IsError() after ReceiveNetGUIDBunch. ChIndex: %i" ), ChIndex );
return;
}
}
if ( Connection->IsInternalAck() && Broken )
{
return;
}
check(Connection->Channels[ChIndex]==this);
if ( Bunch.bReliable && Bunch.ChSequence != Connection->InReliable[ChIndex] + 1 )
{
// 先暂时忽略掉可靠消息的乱序接收处理
}
else
{
bool bDeleted = ReceivedNextBunch( Bunch, bOutSkipAck );
if ( Bunch.IsError() )
{
UE_LOG( LogNetTraffic, Error, TEXT( "UChannel::ReceivedRawBunch: Bunch.IsError() after ReceivedNextBunch 1" ) );
return;
}
if (bDeleted)
{
return;
}
// Dispatch any waiting bunches.
while( InRec )
{
// We shouldn't hit this path on 100% reliable connections
check( !Connection->IsInternalAck() );
if( InRec->ChSequence!=Connection->InReliable[ChIndex]+1 )
break;
UE_LOG(LogNetTraffic, Log, TEXT(" Channel %d Unleashing queued bunch"), ChIndex );
FInBunch* Release = InRec;
InRec = InRec->Next;
NumInRec--;
// Just keep a local copy of the bSkipAck flag, since these have already been acked and it doesn't make sense on this context
// Definitely want to warn when this happens, since it's really not possible
bool bLocalSkipAck = false;
bDeleted = ReceivedNextBunch( *Release, bLocalSkipAck );
if ( bLocalSkipAck )
{
UE_LOG( LogNetTraffic, Warning, TEXT( "UChannel::ReceivedRawBunch: bLocalSkipAck == true for already acked packet" ) );
}
if ( Bunch.IsError() )
{
UE_LOG( LogNetTraffic, Error, TEXT( "UChannel::ReceivedRawBunch: Bunch.IsError() after ReceivedNextBunch 2" ) );
return;
}
delete Release;
if (bDeleted)
{
return;
}
//AssertInSequenced();
}
}
}
在这个函数里会使用ReceivedNextBunch来从当前的Bunch解析出来HandleBunch,这个HandleBunch代表一个完整的逻辑包。如果当前包不是被拆分的包的话这个HandleBunch就是传入的Bunch,如果当前包是拆分包则需要处理大包的子包合并问题:
bool UChannel::ReceivedNextBunch( FInBunch & Bunch, bool & bOutSkipAck )
{
// We received the next bunch. Basically at this point:
// -We know this is in order if reliable
// -We dont know if this is partial or not
// If its not a partial bunch, of it completes a partial bunch, we can call ReceivedSequencedBunch to actually handle it
// Note this bunch's retirement.
if ( Bunch.bReliable )
{
// Reliables should be ordered properly at this point
check( Bunch.ChSequence == Connection->InReliable[Bunch.ChIndex] + 1 );
Connection->InReliable[Bunch.ChIndex] = Bunch.ChSequence;
}
FInBunch* HandleBunch = &Bunch;
if (Bunch.bPartial)
{
HandleBunch = NULL;
// 暂时省略大包的合并相关代码
}
// 省略后续代码
}
现在来研究一下UChannel::ReceivedNextBunch里对于PartialBunch是如何处理的,首先会判断当前Bunch是否是大包拆分之后的第一个包,也就是bPartialInitial这个标记位为true。如果是第一个拆分包的话,会检查当前是否已经在处理其他的拆分包,这个已经在处理的拆分包会记录在当前Channel的InPartialBunch字段上。如果已经有了InPartialBunch,那么需要删除这个InPartialBunch,并记录一些错误日志,因为当前的Bunch肯定已经乱序接收了:
if (Bunch.bPartialInitial)
{
// Create new InPartialBunch if this is the initial bunch of a new sequence.
if (InPartialBunch != NULL)
{
if (!InPartialBunch->bPartialFinal)
{
if ( InPartialBunch->bReliable )
{
if ( Bunch.bReliable )
{
UE_LOG(LogNetPartialBunch, Warning, TEXT("Reliable partial trying to destroy reliable partial 1. %s"), *Describe());
Bunch.SetError();
return false;
}
UE_LOG(LogNetPartialBunch, Log, TEXT( "Unreliable partial trying to destroy reliable partial 1") );
bOutSkipAck = true;
return false;
}
// We didn't complete the last partial bunch - this isn't fatal since they can be unreliable, but may want to log it.
UE_LOG(LogNetPartialBunch, Verbose, TEXT("Incomplete partial bunch. Channel: %d ChSequence: %d"), InPartialBunch->ChIndex, InPartialBunch->ChSequence);
}
delete InPartialBunch;
InPartialBunch = NULL;
}
InPartialBunch = new FInBunch(Bunch, false);
if ( !Bunch.bHasPackageMapExports && Bunch.GetBitsLeft() > 0 )
{
if ( Bunch.GetBitsLeft() % 8 != 0 )
{
UE_LOG(LogNetPartialBunch, Warning, TEXT("Corrupt partial bunch. Initial partial bunches are expected to be byte-aligned. BitsLeft = %u. %s"), Bunch.GetBitsLeft(), *Describe());
Bunch.SetError();
return false;
}
InPartialBunch->AppendDataFromChecked( Bunch.GetDataPosChecked(), Bunch.GetBitsLeft() );
LogPartialBunch(TEXT("Received new partial bunch."), Bunch, *InPartialBunch);
}
else
{
LogPartialBunch(TEXT("Received New partial bunch. It only contained NetGUIDs."), Bunch, *InPartialBunch);
}
}
完成了异常情况的处理之后,会以当前的Bunch来创建一个新的InPartialBunch,同时将当前Bunch里的数据添加到InPartialBunch的后面。
如果当前的Bunch并不是InitialBunch,那么就会将这个Bunch的数据拼接到InPartialBunch的后面,但是这个Bunch拼接是有条件的,要求两个Bunch的bReliable是一致的,且Bunch里记录的序列号要求一致。:
// Merge in next partial bunch to InPartialBunch if:
// -We have a valid InPartialBunch
// -The current InPartialBunch wasn't already complete
// -ChSequence is next in partial sequence
// -Reliability flag matches
bool bSequenceMatches = false;
if (InPartialBunch)
{
const bool bReliableSequencesMatches = Bunch.ChSequence == InPartialBunch->ChSequence + 1;
const bool bUnreliableSequenceMatches = bReliableSequencesMatches || (Bunch.ChSequence == InPartialBunch->ChSequence);
// Unreliable partial bunches use the packet sequence, and since we can merge multiple bunches into a single packet,
// it's perfectly legal for the ChSequence to match in this case.
// Reliable partial bunches must be in consecutive order though
bSequenceMatches = InPartialBunch->bReliable ? bReliableSequencesMatches : bUnreliableSequenceMatches;
}
if ( InPartialBunch && !InPartialBunch->bPartialFinal && bSequenceMatches && InPartialBunch->bReliable == Bunch.bReliable )
{
// Merge.
UE_LOG(LogNetPartialBunch, Verbose, TEXT("Merging Partial Bunch: %d Bytes"), Bunch.GetBytesLeft() );
if ( !Bunch.bHasPackageMapExports && Bunch.GetBitsLeft() > 0 )
{
InPartialBunch->AppendDataFromChecked( Bunch.GetDataPosChecked(), Bunch.GetBitsLeft() );
}
// Only the final partial bunch should ever be non byte aligned. This is enforced during partial bunch creation
// This is to ensure fast copies/appending of partial bunches. The final partial bunch may be non byte aligned.
if (!Bunch.bHasPackageMapExports && !Bunch.bPartialFinal && (Bunch.GetBitsLeft() % 8 != 0))
{
UE_LOG(LogNetPartialBunch, Warning, TEXT("Corrupt partial bunch. Non-final partial bunches are expected to be byte-aligned. bHasPackageMapExports = %d, bPartialFinal = %d, BitsLeft = %u. %s"),
Bunch.bHasPackageMapExports ? 1 : 0, Bunch.bPartialFinal ? 1 : 0, Bunch.GetBitsLeft(), *Describe());
Bunch.SetError();
return false;
}
// Advance the sequence of the current partial bunch so we know what to expect next
InPartialBunch->ChSequence = Bunch.ChSequence;
if (Bunch.bPartialFinal)
{
LogPartialBunch(TEXT("Completed Partial Bunch."), Bunch, *InPartialBunch);
if ( Bunch.bHasPackageMapExports )
{
// Shouldn't have these, they only go in initial partial export bunches
UE_LOG(LogNetPartialBunch, Warning, TEXT("Corrupt partial bunch. Final partial bunch has package map exports. %s"), *Describe());
Bunch.SetError();
return false;
}
HandleBunch = InPartialBunch;
InPartialBunch->bPartialFinal = true;
InPartialBunch->bClose = Bunch.bClose;
InPartialBunch->CloseReason = Bunch.CloseReason;
InPartialBunch->bIsReplicationPaused = Bunch.bIsReplicationPaused;
InPartialBunch->bHasMustBeMappedGUIDs = Bunch.bHasMustBeMappedGUIDs;
}
else
{
LogPartialBunch(TEXT("Received Partial Bunch."), Bunch, *InPartialBunch);
}
}
else
{
// 一些异常情况的错误处理
if (InPartialBunch)
{
delete InPartialBunch;
InPartialBunch = NULL;
}
}
这里的序列号一致有两个分支判断:当bReliable为true的时候,新Bunch的序列号必须比InPartialBunch的序列号大1;当bReliable为false的时候要求新Bunch的序列号与InPartialBunch的序列号差值要小于等于1。这里允许非可靠包的不同PartialBunch序列号相等的情况,估计是在执行包拆分的时候最后一个包的大小比较小,导致在拼装Packet的时候与前面的包合并到一个Packet里了。
拼接完成数据之后,检查当前Bunch是否是分包里的最后一个子包,如果是则代表一个完整包的多个分包都接收完毕了,可以将HandleBunch设置为当前的InPartialBunch。在获取了一个有效的HandleBunch之后,再调用ReceivedSequencedBunch来处理这个完整的消息包。
bool UChannel::ReceivedNextBunch( FInBunch & Bunch, bool & bOutSkipAck )
{
FInBunch* HandleBunch = &Bunch;
if (Bunch.bPartial)
{
//省略大包的合并相关代码
}
if ( HandleBunch != NULL )
{
const bool bBothSidesCanOpen = Connection->Driver && Connection->Driver->ChannelDefinitionMap[ChName].bServerOpen && Connection->Driver->ChannelDefinitionMap[ChName].bClientOpen;
if ( HandleBunch->bOpen )
{
// 忽略一些channel打开时的逻辑处理
}
if ( !bBothSidesCanOpen ) // Voice channels can open from both side simultaneously, so ignore this logic until we resolve this
{
// 忽略一些channel打开时的逻辑处理
}
// Receive it in sequence.
return ReceivedSequencedBunch( *HandleBunch );
}
return false;
}
ReceivedSequencedBunch只是简单的进行Bunch转发,并处理一下Channel关闭的逻辑。这里的UChannel::ReceivedBunch是一个纯虚函数,具体的Bunch内数据解析逻辑依赖于三个子类的重载,这里我们将不再跟进,UActorChannel::ReceivedBunch的相关内容将在后续的Actor同步中进行介绍:
bool UChannel::ReceivedSequencedBunch( FInBunch& Bunch )
{
SCOPED_NAMED_EVENT(UChannel_ReceivedSequencedBunch, FColor::Green);
// Handle a regular bunch.
if ( !Closing )
{
ReceivedBunch( Bunch );
}
// We have fully received the bunch, so process it.
if( Bunch.bClose )
{
// 忽略channel关闭的处理
return true;
}
return false;
}
/** Handle an incoming bunch. */
virtual void UChannel::ReceivedBunch( FInBunch& Bunch ) PURE_VIRTUAL(UChannel::ReceivedBunch,);
void UActorChannel::ReceivedBunch( FInBunch & Bunch );
void UVoiceChannel::ReceivedBunch(FInBunch& Bunch);
void UControlChannel::ReceivedBunch( FInBunch& Bunch );
可靠传输
众所周知UDP是一个不可靠的协议,在其协议规范上并没有做类似于TCP的可靠收发保证。因此所有使用UDP作为底层通信协议的业务系统都需要在UDP的基础上模拟一个类似于TCP的可靠传输协议出来,UE也不例外。为了实现可靠传输,首先需要在每个Packet上赋予一个递增序列号,作为这个Packet的唯一标识符。并且通信的双方都需要对已经收到的包进行确认,也就是常说的ACK。为了方便的维护这个双端包序列号的发送与确认,UE里专门设计了一个结构FNetPacketNotify,在这个结构体里有多个序列号相关的成员变量:
/**
FNetPacketNotify - Drives delivery of sequence numbers, acknowledgments and notifications of delivery sequence numbers
*/
class FNetPacketNotify
{
private:
// Track incoming sequence data
SequenceHistoryT InSeqHistory; // BitBuffer containing a bitfield describing the history of received packets
SequenceNumberT InSeq; // Last sequence number received and accepted from remote
SequenceNumberT InAckSeq; // Last sequence number received from remote that we have acknowledged, this is needed since we support accepting a packet but explicitly not acknowledge it as received.
SequenceNumberT InAckSeqAck; // Last sequence number received from remote that we have acknowledged and also knows that the remote has received the ack, used to calculate how big our history must be
// Track outgoing sequence data
SequenceNumberT OutSeq; // Outgoing sequence number
SequenceNumberT OutAckSeq; // Last sequence number that we know that the remote side have received.
};
InSeq和OutSeq的意义很明显,分别代表接收到的最大包序号和下一个发出包的序号。OutAckSeq代表已发出的包中收到的对方确认了的最大包序号,对应的InAckSeq代表已接收到的包中给对端发送ACK的最大包序号。由于UE并不是收到一个对端的包之后就立即对这个包进行ACK,所以这个InAckSeq并不是总等于InSeq。这些字段的初始值会在连接建立的时候予以初始化:
void UNetConnection::InitSequence(int32 IncomingSequence, int32 OutgoingSequence)
{
// Make sure the sequence hasn't already been initialized on the server, and ignore multiple initializations on the client
check(InPacketId == -1 || Driver->ServerConnection != nullptr);
if (InPacketId == -1 && CVarRandomizeSequence.GetValueOnAnyThread() > 0)
{
// Initialize the base UNetConnection packet sequence (not very useful/effective at preventing attacks)
InPacketId = IncomingSequence - 1;
OutPacketId = OutgoingSequence;
OutAckPacketId = OutgoingSequence - 1;
LastNotifiedPacketId = OutAckPacketId;
// Initialize the reliable packet sequence (more useful/effective at preventing attacks)
InitInReliable = IncomingSequence & (MAX_CHSEQUENCE - 1);
InitOutReliable = OutgoingSequence & (MAX_CHSEQUENCE - 1);
InReliable.Init(InitInReliable, InReliable.Num());
OutReliable.Init(InitOutReliable, OutReliable.Num());
PacketNotify.Init(InPacketId, OutPacketId);
UE_LOG(LogNet, Verbose, TEXT("InitSequence: IncomingSequence: %i, OutgoingSequence: %i, InitInReliable: %i, InitOutReliable: %i"), IncomingSequence, OutgoingSequence, InitInReliable, InitOutReliable);
}
}
void FNetPacketNotify::Init(SequenceNumberT InitialInSeq, SequenceNumberT InitialOutSeq)
{
InSeqHistory.Reset();
InSeq = InitialInSeq;
InAckSeq = InitialInSeq;
InAckSeqAck = InitialInSeq;
OutSeq = InitialOutSeq;
OutAckSeq = SequenceNumberT(InitialOutSeq.Get() - 1);
}
最后的InAckSeqAck则比较拗口,代表对端确认的自己这边接收到的包的最大值。举个例子来说说一下这些字段的更新逻辑:
A与B在连接建立的时候会在握手信息里商定双方的Seq初始值,假设A的初始Seq为100,同时B的初始Seq为200,那么A.InSeq=199,A.InAckSeq=199,A.InAckSeqAck=199,A.OutSeq=100,A.OutAckSeq=99,B.InSeq=99,B.InAckSeq=99,B.InAckSeqAck=99,B.OutSeq=200,B.OutAckSeq=199。A往B发送了一个序号为100的包,此时更新A.OutSeq=100,同时携带对B发出的199包的ACK信息,更新A.InAckSeq=199。- 当
B收到之后,更新B.InSeq=100,B.OutAckSeq=199,此时不急于发送100这个ACK, 等待后续发包的时候带上这个ACK。 - 后面
B在往A发送一个编号为200的包的时候,顺带的加上对A发出的100包的ACK信息,此时B.InAckSeq=100,B.OutSeq=200。 - 在
A接收到B发出的200包之后,更新A.InSeq=200,同时A知道对端B已经接收到了A发出的100包,此时更新A.OutAckSeq=100。 - 接下来
A发送给B一个编号为101的包,更新A.OutSeq=101,同时带上对B发出的200包的ACK信息,更新A.InAckSeq=200。 - 当
B收到这个101的包之后,更新B.InSeq=101,同时从这个包里解析出来包含对于B发出的200包的ACK信息,因此更新B.OutAckSeq=200。 - 由于
B发送的200包里携带了针对于A发送的100包的ACK,所以这个时候B知道A已经确认了B已经接收了100包这个信息,所以此时可以更新B.InAckSeqAck=100。

在UE的可靠UDP设计里,消息传输的有序性并不是建立在Packet的按照顺序接收的基础上,而是建立在可靠Bunch的按序处理的基础上,因为可靠传输的Bunch会赋予一个连续递增的序列号。当UE接收到一个Packet之后,会解析出这个Packet里所携带的所有Bunch,然后根据这个Packet的可靠性来做后续的处理:
- 如果当前
Packet是不需要可靠传输的,那么这些Bunch就会直接发送到逻辑层去处理 - 如果当前
Packet是需要可靠传输的,那么这些Bunch会投递到所属的Channel里进行排序,当出现连续Bunch的时候才会被分发到逻辑层
在这样的设计下,UE里每次ACK都代表一个独立的包被接收,而不是与TCP一样代表所有序号小于等于这个ACK的包被接收。当一个包长时间没有接收到对应的ACK的时候,就可以当作这个包的数据已经丢失。如果这个包是一个不可靠包,那么本地将不做任何处理;如果这个包是一个可靠包,那么UE会将这个包里的所有Bunch都解析出来然后重新放到Bunch的发送队列里等待后续组成Packet重新发送过去。举个例子来说,如果A向B发出了四个包:101不可靠, 102不可靠, 103可靠,包含Bunch(10), 104可靠包含Bunch(11)。但是B只接收到了101和104,那么会按照包的上升序来分别处理101,104,但是处理104的时候发现内部的可靠Bunch的序列号11无法与之前接收到的最大可靠Bunch的序列号9连接起来,因此会先将Bunch(11)暂时存下来。后续B往A发包的时候会带上针对101,104的ACK。A收到B发出的101,104的ACK之后,知道了102,103两个包丢失了,由于102是不可靠包所以不会去重传这个包,但是103是可靠包,所以会将其内部的Bunch(10)拿出来放到发送Bunch队列的头部。在后面A会构造一个新的编号为105的可靠包,包里的内容是Bunch(10),Bunch(12)。当B收到这个105包之后,将Bunch(10),Bunch(12)与暂存了的Bunch(11)合并起来排序,然后不断的获取其内部可以在Bunch(9)之后按序接收的所有包,并执行逻辑层的分发。

上述内容就是UE可靠UDP的大致实现框架,接下来再来讲解一下具体的实现细节。首先是Packet的序列号分配,这个分配接口为FNetPacketNotify::CommitAndIncrementOutSeq,调用者为前面介绍过的FlushNet:
FNetPacketNotify::SequenceNumberT FNetPacketNotify::CommitAndIncrementOutSeq()
{
// we have not written a header...this is a fail.
check(WrittenHistoryWordCount != 0);
// Add entry to the ack-record so that we can update the InAckSeqAck when we received the ack for this OutSeq.
AckRecord.Enqueue( {OutSeq, WrittenInAckSeq} );
WrittenHistoryWordCount = 0u;
return ++OutSeq;
}
在这个函数里除了对OutSeq进行自增之外,还往AckRecord这个队列里添加了一个元素。这个AckRecord队列的作用是记录每次发出的包序列号以及这个包附带的ACK序列号:
struct FSentAckData
{
SequenceNumberT OutSeq; // 发送出去的包的序列号
SequenceNumberT InAckSeq; // 这个包所携带的ACK序列号
};
typedef TResizableCircularQueue<FSentAckData, TInlineAllocator<128>> AckRecordT;
AckRecordT AckRecord; // Track acked seq for each sent packet to track size of ack history
这样当自己接收到对端发送过来的一个针对己方发送的包的ACK时,就可以从这个AckRecord里找到自己发送的这个包所携带的针对对端的ACK信息,从而就可以更新InAckSeqAck字段了,这个查找包序号对应的ACK序号的接口为FNetPacketNotify::UpdateInAckSeqAck:
FNetPacketNotify::SequenceNumberT FNetPacketNotify::UpdateInAckSeqAck(SequenceNumberT::DifferenceT AckCount, SequenceNumberT AckedSeq)
{
if ((SIZE_T)AckCount <= AckRecord.Count())
{
if (AckCount > 1)
{
AckRecord.PopNoCheck(AckCount - 1);
}
FSentAckData AckData = AckRecord.PeekNoCheck();
AckRecord.PopNoCheck();
// verify that we have a matching sequence number
if (AckData.OutSeq == AckedSeq)
{
return AckData.InAckSeq;
}
}
// Pessimistic view, should never occur but we do want to know about it if it would
ensureMsgf(false, TEXT("FNetPacketNotify::UpdateInAckSeqAck - Failed to find matching AckRecord for %u"), AckedSeq.Get());
return SequenceNumberT(AckedSeq.Get() - MaxSequenceHistoryLength);
}
有了包序列号之后,就需要在发送Packet的时候将包序号和ACK序号都附加到Packet里,这个添加序列号的逻辑在前面介绍的WritePacketHeader里:
// IMPORTANT:
// WritePacketHeader must ALWAYS write the exact same number of bits as we go back and rewrite the header
// right before we put the packet on the wire.
void UNetConnection::WritePacketHeader(FBitWriter& Writer)
{
// If this is a header refresh, we only serialize the updated serial number information
const bool bIsHeaderUpdate = Writer.GetNumBits() > 0u;
// Header is always written first in the packet
FBitWriterMark Reset;
FBitWriterMark Restore(Writer);
Reset.PopWithoutClear(Writer);
// Write notification header or refresh the header if used space is the same.
bool bWroteHeader = PacketNotify.WriteHeader(Writer, bIsHeaderUpdate);
#if !UE_BUILD_SHIPPING
checkf(Writer.GetNumBits() <= MAX_PACKET_RELIABLE_SEQUENCE_HEADER_BITS, TEXT("WritePacketHeader exceeded the max allowed bits. Wrote %d. Max %d"), Writer.GetNumBits(), MAX_PACKET_RELIABLE_SEQUENCE_HEADER_BITS);
#endif
// 忽略一些代码
}
// These methods must always write and read the exact same number of bits, that is the reason for not using WriteInt/WrittedWrappedInt
bool FNetPacketNotify::WriteHeader(FBitWriter& Writer, bool bRefresh)
{
// we always write at least 1 word
SIZE_T CurrentHistoryWordCount = FMath::Clamp<SIZE_T>((GetCurrentSequenceHistoryLength() + SequenceHistoryT::BitsPerWord - 1u) / SequenceHistoryT::BitsPerWord, 1u, SequenceHistoryT::WordCount);
// We can only do a refresh if we do not need more space for the history
if (bRefresh && (CurrentHistoryWordCount > WrittenHistoryWordCount))
{
return false;
}
// How many words of ack data should we write? If this is a refresh we must write the same size as the original header
WrittenHistoryWordCount = bRefresh ? WrittenHistoryWordCount : CurrentHistoryWordCount;
// This is the last InAck we have acknowledged at this time
WrittenInAckSeq = InAckSeq;
SequenceNumberT::SequenceT Seq = OutSeq.Get();
SequenceNumberT::SequenceT AckedSeq = InAckSeq.Get();
// Pack data into a uint
uint32 PackedHeader = FPackedHeader::Pack(Seq, AckedSeq, WrittenHistoryWordCount - 1);
// Write packed header
Writer << PackedHeader;
// Write ack history
InSeqHistory.Write(Writer, WrittenHistoryWordCount);
UE_LOG_PACKET_NOTIFY(TEXT("FNetPacketNotify::WriteHeader - Seq %u, AckedSeq %u bReFresh %u HistorySizeInWords %u"), Seq, AckedSeq, bRefresh ? 1u : 0u, WrittenHistoryWordCount);
return true;
}
在这个WriteHeader函数里会在头部先写入一个uint32,这个uint32是由三个分量组成的:Seq代表当前Packet的序列号,占据14bit,AckedSeq代表已经收到包的最大序列号,占据14bit, 还有一个占据4bit的WrittenHistoryWordCount。三个分量刚好组合成32bit:
struct FPackedHeader
{
using SequenceNumberT = FNetPacketNotify::SequenceNumberT;
static_assert(FNetPacketNotify::SequenceNumberBits <= 14, "SequenceNumbers must be smaller than 14 bits to fit history word count");
enum { HistoryWordCountBits = 4 };
enum { SeqMask = (1 << FNetPacketNotify::SequenceNumberBits) - 1 };
enum { HistoryWordCountMask = (1 << HistoryWordCountBits) - 1 };
enum { AckSeqShift = HistoryWordCountBits };
enum { SeqShift = AckSeqShift + FNetPacketNotify::SequenceNumberBits };
static uint32 Pack(SequenceNumberT Seq, SequenceNumberT AckedSeq, SIZE_T HistoryWordCount)
{
uint32 Packed = 0u;
Packed |= Seq.Get() << SeqShift;
Packed |= AckedSeq.Get() << AckSeqShift;
Packed |= HistoryWordCount & HistoryWordCountMask;
return Packed;
}
static SequenceNumberT GetSeq(uint32 Packed) { return SequenceNumberT(Packed >> SeqShift & SeqMask); }
static SequenceNumberT GetAckedSeq(uint32 Packed) { return SequenceNumberT(Packed >> AckSeqShift & SeqMask); }
static SIZE_T GetHistoryWordCount(uint32 Packed) { return (Packed & HistoryWordCountMask); }
};

但是这样的设计下,序列号只有14bit可用,最大值只有16383,正常通信情况下这个最大值几分钟之内就会被超过。UE在处理序列号溢出的时候就是简单的进行回环,即序列号16383的下一个是0。为了方便的支持基于回环的加减与比较,UE为包序列号专门设计了一个类型SequenceNumberT:
/** Helper class to work with sequence numbers */
template <SIZE_T NumBits, typename SequenceType>
class TSequenceNumber
{
static_assert(TIsSigned<SequenceType>::Value == false, "The base type for sequence numbers must be unsigned");
public:
using SequenceT = SequenceType;
using DifferenceT = int32;
// Constants
enum { SeqNumberBits = NumBits };
enum { SeqNumberCount = SequenceT(1) << NumBits };
enum { SeqNumberHalf = SequenceT(1) << (NumBits - 1) };
enum { SeqNumberMax = SeqNumberCount - 1u };
enum { SeqNumberMask = SeqNumberMax };
/** Default constructor */
TSequenceNumber() : Value(0u) {}
/** Constructor with given value */
TSequenceNumber(SequenceT ValueIn) : Value(ValueIn & SeqNumberMask) {}
/** Get Current Value */
SequenceT Get() const { return Value; }
/** Diff between sequence numbers (A - B) only valid if (A - B) < SeqNumberHalf */
static DifferenceT Diff(TSequenceNumber A, TSequenceNumber B);
/** return true if this is > Other, this is only considered to be the case if (A - B) < SeqNumberHalf since we have to be able to detect wraparounds */
bool operator>(const TSequenceNumber& Other) const { return (Value != Other.Value) && (((Value - Other.Value) & SeqNumberMask) < SeqNumberHalf); }
/** Check if this is >= Other, See above */
bool operator>=(const TSequenceNumber& Other) const { return ((Value - Other.Value) & SeqNumberMask) < SeqNumberHalf; }
/** Pre-increment and wrap around */
TSequenceNumber& operator++() { Increment(1u); return *this; }
/** Post-increment and wrap around */
TSequenceNumber operator++(int) { TSequenceNumber Tmp(*this); Increment(1u); return Tmp; }
private:
void Increment(SequenceT InValue) { *this = TSequenceNumber(Value + InValue); }
SequenceT Value;
};
enum { SequenceNumberBits = 14 };
enum { MaxSequenceHistoryLength = 256 };
typedef TSequenceNumber<SequenceNumberBits, uint16> SequenceNumberT;
这里执行比较的逻辑比较绕,简单来说就是A与B之间基于回环的差值小于8192的时候才认为A的序列号比B小,可以拆分为下面的几种情况来方便理解:
0<int(A) - int(B)<8191,则认为A比B大, 样例A=2, B=1int(A) - int(B) >=8191,则认为B比A大,样例A=16000, B=1-8192<int(A) - int(B) <0,则认为A比B小,样例A=1, B=4000int(A) - int(B) < -8192, 则认为B比A大, 样例A=1, B=14000
知道了序列号的大小比较规则之后,计算序列号的差值的逻辑就更好理解了:
template <SIZE_T NumBits, typename SequenceType>
typename TSequenceNumber<NumBits, SequenceType>::DifferenceT TSequenceNumber<NumBits, SequenceType>::Diff(TSequenceNumber A, TSequenceNumber B)
{
constexpr SIZE_T ShiftValue = sizeof(DifferenceT)*8 - NumBits;
const SequenceT ValueA = A.Value;
const SequenceT ValueB = B.Value;
return (DifferenceT)((ValueA - ValueB) << ShiftValue) >> ShiftValue;
};
举例来说上面这个函数的计算过程,假设A=10, B=250,则这两个uint16相减之后会得到65296,这个差值明显大于了我们指定的序列号上限16383,所以这里使用先左移后右移的方式来抹除多余的两个bit,并最终得到结果-240。
WrittenHistoryWordCount这个值用来表示在PackedHeader这个整数之后写入的动态数据的大小,只有4bit有效位,同时还被Clamp限制其取值范围为[1,8]。但是这个值的单位是typedef uint32 wordT,即写入的动态数据是以wordT来对齐的,每个wordT有四个字节,所以在后续写入的动态数据的大小上限为32byte,也就是256bit。
InSeqHistory.Write(Writer, WrittenHistoryWordCount)写入的动态数据代表当前ACK包序号AckedSeq之前的若干连续包的接收状态,如果一个包被接收了则对应的bit会被设置为1,如果一个包没有被接收那么对应的bit就会被设置为0。这个标记接口为FNetPacketNotify::AckSeq,第一个参数为要ACK的包编号,函数内部会通过循环来不断的更新InAckSeq,所有中间的包对应的bit都会设置为0,代表丢失,最后一个包的接收状态由传入的参数IsAck来控制。在FNetPacketNotify提供了这个AckSeq的封装, 单参数的AckSeq代表确认了这个包的接收,NakSeq代表确认了这个包的丢失:
void FNetPacketNotify::AckSeq(SequenceNumberT AckedSeq, bool IsAck)
{
check( AckedSeq == InSeq);
while (AckedSeq > InAckSeq)
{
++InAckSeq;
const bool bReportAcked = InAckSeq == AckedSeq ? IsAck : false;
UE_LOG_PACKET_NOTIFY(TEXT("FNetPacketNotify::AckSeq - AckedSeq: %u, IsAck %u"), InAckSeq.Get(), bReportAcked ? 1u : 0u);
InSeqHistory.AddDeliveryStatus(bReportAcked);
}
}
/** Mark Seq as received and update current InSeq, missing sequence numbers will be marked as lost */
void AckSeq(SequenceNumberT Seq) { AckSeq(Seq, true); }
/** Explicitly mark Seq as not received and update current InSeq, additional missing sequence numbers will be marked as lost */
void NakSeq(SequenceNumberT Seq) { AckSeq(Seq, false); }
由于动态数据最多只有256bit,只能覆盖当前最新包的前256个包的接收状态,如果一个比较久远的包没有被接收到,那么这里就会出现问题,所以在WriterHeader的开头会有这个长度的检查,发现异常则直接返回false:
bool FNetPacketNotify::WriteHeader(FBitWriter& Writer, bool bRefresh)
{
// we always write at least 1 word
SIZE_T CurrentHistoryWordCount = FMath::Clamp<SIZE_T>((GetCurrentSequenceHistoryLength() + SequenceHistoryT::BitsPerWord - 1u) / SequenceHistoryT::BitsPerWord, 1u, SequenceHistoryT::WordCount);
// We can only do a refresh if we do not need more space for the history
if (bRefresh && (CurrentHistoryWordCount > WrittenHistoryWordCount))
{
return false;
}
// 省略后续代码
}
这里之所以写入这个InSeqHistory信息到Packet里是为了单个Packet对多个已经接收的包来执行ACK,如果每次接收到一个包都单独发出ACK消息的话,会非常浪费流量。因此这里使用bit来代表一个包的接收状态,可以非常的高效的对多个包执行批量ACK,节省了很多流量。
虽然InSeqHistory可以最大容纳256bit,但是每次都完整的写入当前确认包之前的256个包的接收状态有点过滤浪费流量了。所以这里会使用GetCurrentSequenceHistoryLength来判断有多少个包的状态需要写入,如果某个包A之前的包接收状态对方已经知道了,那么此时只需要将A之后到InAckSeq之间的包的接收状态进行发送,也就是(InAckSeqAck,InAckSeq)这个区间的包,这里就是InAckSeqAck这个变量的唯一作用,计算哪些包的接收状态对方还不知道:
SIZE_T FNetPacketNotify::GetCurrentSequenceHistoryLength() const
{
if (InAckSeq >= InAckSeqAck)
{
return (SIZE_T)SequenceNumberT::Diff(InAckSeq, InAckSeqAck);
}
else
{
// Worst case send full history
return SequenceHistoryT::Size;
}
}
当网络通信的对端接收到这个Packet的时候,在UNetConnection::ReceivedPacket也需要按照WriterHeader的格式来解析头部的数据:
bool FNetPacketNotify::ReadHeader(FNotificationHeader& Data, FBitReader& Reader) const
{
// Read packed header
uint32 PackedHeader = 0;
Reader << PackedHeader;
// unpack
Data.Seq = FPackedHeader::GetSeq(PackedHeader);
Data.AckedSeq = FPackedHeader::GetAckedSeq(PackedHeader);
Data.HistoryWordCount = FPackedHeader::GetHistoryWordCount(PackedHeader) + 1;
// Read ack history
Data.History.Read(Reader, Data.HistoryWordCount);
UE_LOG_PACKET_NOTIFY(TEXT("FNetPacketNotify::ReadHeader - Seq %u, AckedSeq %u HistorySizeInWords %u"), Data.Seq.Get(), Data.AckedSeq.Get(), Data.HistoryWordCount);
return Reader.IsError() == false;
}
void UNetConnection::ReceivedPacket( FBitReader& Reader, bool bIsReinjectedPacket)
{
SCOPED_NAMED_EVENT(UNetConnection_ReceivedPacket, FColor::Green);
AssertValid();
// Handle PacketId.
if( Reader.IsError() )
{
ensureMsgf(false, TEXT("Packet too small") );
return;
}
// 省略一些代码
FChannelsToClose ChannelsToClose;
if (IsInternalAck())
{
++InPacketId;
}
else
{
// Read packet header
FNetPacketNotify::FNotificationHeader Header;
if (!PacketNotify.ReadHeader(Header, Reader))
{
CLOSE_CONNECTION_DUE_TO_SECURITY_VIOLATION(this, ESecurityEvent::Malformed_Packet, TEXT("Failed to read PacketHeader"));
return;
}
// 省略后续代码
}
}
当解析完这个Header信息之后,就可以知道当前新Packet的包序列号了,此时需要将这个新包序列号与之前记录的收到的最大包序列号做一个比较。由于当前UE的接收设计,只有新包序列号大于之前记录的InSeq的时候才会去处理,否则会直接认为这个包是无效包。所以这个ReceivedPacket的开头会首先计算这个序列号的差值:
/**
* Gets the delta between the present sequence, and the sequence inside the specified header - if the delta is positive
*/
SequenceNumberT::DifferenceT GetSequenceDelta(const FNotificationHeader& NotificationData)
{
if (NotificationData.Seq > InSeq && NotificationData.AckedSeq >= OutAckSeq && OutSeq > NotificationData.AckedSeq)
{
return SequenceNumberT::Diff(NotificationData.Seq, InSeq);
}
else
{
return 0;
}
}
void UNetConnection::ReceivedPacket( FBitReader& Reader, bool bIsReinjectedPacket)
{
// 省略前述代码
const int32 PacketSequenceDelta = PacketNotify.GetSequenceDelta(Header);
if(PacketSequenceDelta > 0)
{
// 暂时省略一些代码
}
else
{
// 暂时省略一些代码
}
// 省略后续代码
}
如果比较出来的差值小于0,则代表出现了历史消息的重复接收,因此可以直接忽略掉。但是这里还做了一点额外工作,如果这种历史消息出现了多次,超过了指定的阈值CVarNetPacketOrderCorrectionEnableThreshold, 那么可以认为当前的网络环境不怎么好,此时会创建一个乱序接收包的队列PacketOrderCache:
static TAutoConsoleVariable<int32> CVarNetDoPacketOrderCorrection(TEXT("net.DoPacketOrderCorrection"), 1,
TEXT("Whether or not to try to fix 'out of order' packet sequences, by caching packets and waiting for the missing sequence."));
static TAutoConsoleVariable<int32> CVarNetPacketOrderCorrectionEnableThreshold(TEXT("net.PacketOrderCorrectionEnableThreshold"), 1,
TEXT("The number of 'out of order' packet sequences that need to occur, before correction is enabled."));
static TAutoConsoleVariable<int32> CVarNetPacketOrderMaxCachedPackets(TEXT("net.PacketOrderMaxCachedPackets"), 32,
TEXT("(NOTE: Must be power of 2!) The maximum number of packets to cache while waiting for missing packet sequences, before treating missing packets as lost."));
if(PacketSequenceDelta > 0)
{
// 省略一些代码
}
else
{
TotalOutOfOrderPackets++;
Driver->InOutOfOrderPackets++;
if (!PacketOrderCache.IsSet() && CVarNetDoPacketOrderCorrection.GetValueOnAnyThread() != 0)
{
int32 EnableThreshold = CVarNetPacketOrderCorrectionEnableThreshold.GetValueOnAnyThread();
if (TotalOutOfOrderPackets >= EnableThreshold)
{
UE_LOG(LogNet, Verbose, TEXT("Hit threshold of %i 'out of order' packet sequences. Enabling out of order packet correction."), EnableThreshold);
int32 CacheSize = FMath::RoundUpToPowerOfTwo(CVarNetPacketOrderMaxCachedPackets.GetValueOnAnyThread());
PacketOrderCache.Emplace(CacheSize);
}
}
// Protect against replay attacks
// We already protect against this for reliable bunches, and unreliable properties
// The only bunch we would process would be unreliable RPC's, which could allow for replay attacks
// So rather than add individual protection for unreliable RPC's as well, just kill it at the source,
// which protects everything in one fell swoop
return;
}
这里会对PacketOrderCache成员变量做容量初始化,这个成员是一个循环队列的Optional,默认情况下是空值,只有在上面出现重复包的时候才会去初始化:
/** Buffer of partially read (post-PacketHandler) sequenced packets, which are waiting for a missing packet/sequence */
TOptional<TCircularBuffer<TUniquePtr<FBitReader>>> PacketOrderCache;
这个循环队列里存储的数据是TUniquePtr<FBitReader>,其实就是一个Packet的数据,代表一个乱序接收的包,所以这个成员变量主要是作为临时buffer来将乱序接收到的包重整为有序包使用。了解了这个成员变量的作用之后,再去回顾一下之前计算的PacketSequenceDelta>0的分支处理,代码的开头会去初始化一些变量:
static TAutoConsoleVariable<int32> CVarNetPacketOrderMaxMissingPackets(TEXT("net.PacketOrderMaxMissingPackets"), 3,
TEXT("The maximum number of missed packet sequences that is allowed, before treating missing packets as lost."));
const bool bPacketOrderCacheActive = !bFlushingPacketOrderCache && PacketOrderCache.IsSet();
const bool bCheckForMissingSequence = bPacketOrderCacheActive && PacketOrderCacheCount == 0;
const bool bFillingPacketOrderCache = bPacketOrderCacheActive && PacketOrderCacheCount > 0;
const int32 MaxMissingPackets = (bCheckForMissingSequence ? CVarNetPacketOrderMaxMissingPackets.GetValueOnAnyThread() : 0);
const int32 MissingPacketCount = PacketSequenceDelta - 1;
这里涉及到了太多的变量,需要先解释一下这些变量的用途:
bFlushingPacketOrderCache代表是否正在清空PacketOrderCache的连续包,目前我们可以把这个变量当作falsePacketOrderCacheCount代表PacketOrderCache的已占用槽位的个数bPacketOrderCacheActive代表目前能否使用PacketOrderCache来缓存乱序包bCheckForMissingSequence代表当前是否在检查丢失序列,成立条件为PacketOrderCache缓存激活且缓存为空bFillingPacketOrderCache代表目前是否在检查序列填充,成立条件为PacketOrderCache缓存激活且缓存不为空MaxMissingPackets代表最大能处理的中间丢失包的数量,默认为3MissingPacketCount代表当前包与期望收到的下一个包的序列号差值,为0代表就是按序接收的下一个包
了解完这些变量的用途之后,才能去跟进后续的逻辑。开头就会有一个大的条件判断,其实就是排除掉顺序接收以及乱序差值太大的情况,如果乱序太大则会直接认为丢包等待对端重传所有包,不再使用这个PacketOrderCache来存住当前乱序包:
// Cache the packet if we are already caching, and begin caching if we just encountered a missing sequence, within range
if (bFillingPacketOrderCache || (bCheckForMissingSequence && MissingPacketCount > 0 && MissingPacketCount <= MaxMissingPackets))
{
// 省略一些代码
return;
}
if (MissingPacketCount > 10)
{
UE_LOG(LogNetTraffic, Verbose, TEXT("High single frame packet loss. PacketsLost: %i %s" ), MissingPacketCount, *Describe());
}
InPacketsLost += MissingPacketCount;
InTotalPacketsLost += MissingPacketCount;
Driver->InPacketsLost += MissingPacketCount;
Driver->InTotalPacketsLost += MissingPacketCount;
InPacketId += PacketSequenceDelta;
PacketAnalytics.TrackInPacket(InPacketId, MissingPacketCount);
处理完这个if分支之后就会直接return,剩下的逻辑只会在MissingPacketCount=0 || MissingPacketCount > MaxMissingPackets的时候才会执行,只是做一些统计记录。
在这个if分支里,会执行PacketOrderCache的填充操作,这里会首先判断PacketOrderCache能否容下这个包,如果计算出来的LinearCacheIdx超限的话,则会校正到CacheCapacity - 1。
int32 LinearCacheIdx = PacketSequenceDelta - 1;
int32 CacheCapacity = PacketOrderCache->Capacity();
bool bLastCacheEntry = LinearCacheIdx >= (CacheCapacity - 1);
// The last cache entry is only set, when we've reached capacity or when we receive a sequence which is out of bounds of the cache
LinearCacheIdx = bLastCacheEntry ? (CacheCapacity - 1) : LinearCacheIdx;
这个PacketOrderCache循环数组的开头位置记录在PacketOrderCacheStartIdx,那么LinearCacheIdx偏移对应的位置就可以通过调用多次GetNextIndex计算出来:
int32 CircularCacheIdx = PacketOrderCacheStartIdx;
for (int32 LinearDec=LinearCacheIdx; LinearDec > 0; LinearDec--)
{
CircularCacheIdx = PacketOrderCache->GetNextIndex(CircularCacheIdx);
}
在获取最终的数组索引CircularCacheIdx之后,如果这个位置目前暂时没有数据,则创建当前包的数据的一个UniquePtr塞入到对应的位置里,如果这个位置已经有数据,说明乱序包重复了,直接忽略:
TUniquePtr<FBitReader>& CurCachePacket = PacketOrderCache.GetValue()[CircularCacheIdx];
// Reset the reader to its initial position, and cache the packet
if (!CurCachePacket.IsValid())
{
UE_LOG(LogNet, VeryVerbose, TEXT("'Out of Order' Packet Cache, caching sequence order '%i' (capacity: %i)"), LinearCacheIdx, CacheCapacity);
CurCachePacket = MakeUnique<FBitReader>(Reader);
PacketOrderCacheCount++;
ResetReaderMark.Pop(*CurCachePacket);
}
else
{
TotalOutOfOrderPackets++;
Driver->InOutOfOrderPackets++;
}
一旦一个包进入了这个乱序缓存数组,这个包就不会被立即处理,而是等待调用FlushPacketOrderCache来处理若干个上升序的包,这个函数会在ReceivePacket后被立即调用,同时也会在PostTick里被调用。FlushPacketOrderCache会使用ReceivedPacket来重新走一遍这些包的接收流程:
void UNetConnection::ReceivedRawPacket( void* InData, int32 Count )
{
// 省略很多代码
if (Reader.GetBitsLeft() > 0)
{
ReceivedPacket(Reader);
// Check if the out of order packet cache needs flushing
FlushPacketOrderCache();
}
}
void UNetConnection::PostTickDispatch()
{
if (!IsInternalAck())
{
#if DO_ENABLE_NET_TEST
ReinjectDelayedPackets();
#endif
FlushPacketOrderCache(/*bFlushWholeCache=*/true);
PacketAnalytics.Tick();
}
}
void UNetConnection::FlushPacketOrderCache(bool bFlushWholeCache/*=false*/)
{
if (PacketOrderCache.IsSet() && PacketOrderCacheCount > 0)
{
TCircularBuffer<TUniquePtr<FBitReader>>& Cache = PacketOrderCache.GetValue();
int32 CacheEndIdx = PacketOrderCache->GetPreviousIndex(PacketOrderCacheStartIdx);
bool bEndOfCacheSet = Cache[CacheEndIdx].IsValid();
bFlushingPacketOrderCache = true;
// If the end of the cache has had its value set, this forces the flushing of the whole cache, no matter how many missing sequences there are.
// The reason for this (other than making space in the cache), is that when we receive a sequence that is out of range of the cache,
// it is stored at the end, and so the cache index no longer lines up with the sequence number - which it needs to.
bFlushWholeCache = bFlushWholeCache || bEndOfCacheSet;
while (PacketOrderCacheCount > 0)
{
TUniquePtr<FBitReader>& CurCachePacket = Cache[PacketOrderCacheStartIdx];
if (CurCachePacket.IsValid())
{
UE_LOG(LogNet, VeryVerbose, TEXT("'Out of Order' Packet Cache, replaying packet with cache index: %i (bFlushWholeCache: %i)"), PacketOrderCacheStartIdx, (int32)bFlushWholeCache);
ReceivedPacket(*CurCachePacket.Get());
CurCachePacket.Reset();
PacketOrderCacheCount--;
}
// Advance the cache only up to the first missing packet, unless flushing the whole cache
else if (!bFlushWholeCache)
{
break;
}
PacketOrderCacheStartIdx = PacketOrderCache->GetNextIndex(PacketOrderCacheStartIdx);
}
bFlushingPacketOrderCache = false;
}
}
注意这里即使包的序号不连续也是直接处理,即如果当前InSeq为10,且PacketOrderCache里的数据包编号为12, 14,那么12,14这两个包会被ReceivedPacket依次处理,而不是必须等待11, 13这两个包填充到PacketOrderCache。所以这里的PacketOrderCache的作用是对一些乱序接收到的包做一下临时缓存,避免已经接收到的包被丢弃。如果PacketOrderCache没有被启用,那么如果先接收到14,后接收到12,会导致14包先被处理,然后处理12包的时候发现是14包的以前的包,直接忽略,等待后续的重传。
如果是顺序接收MissingPacketCount=0的话,则不会进入这个大的条件判断,而是直接执行后续的处理:
// Lambda to dispatch delivery notifications,
auto HandlePacketNotification = [&Header, &ChannelsToClose, this](FNetPacketNotify::SequenceNumberT AckedSequence, bool bDelivered)
{
// Increase LastNotifiedPacketId, this is a full packet Id
++LastNotifiedPacketId;
++OutTotalNotifiedPackets;
Driver->IncreaseOutTotalNotifiedPackets();
// Sanity check
if (FNetPacketNotify::SequenceNumberT(LastNotifiedPacketId) != AckedSequence)
{
CLOSE_CONNECTION_DUE_TO_SECURITY_VIOLATION(this, ESecurityEvent::Malformed_Packet, TEXT("LastNotifiedPacketId != AckedSequence"));
return;
}
if (bDelivered)
{
ReceivedAck(LastNotifiedPacketId, ChannelsToClose);
}
else
{
ReceivedNak(LastNotifiedPacketId);
};
};
// Update incoming sequence data and deliver packet notifications
// Packet is only accepted if both the incoming sequence number and incoming ack data are valid
PacketNotify.Update(Header, HandlePacketNotification);
这里的PacketNotify.Update会解析对方发送过来的包接收状态数组,并按照包序号来顺序调用上面的HandlePacketNotification,在HandlePacketNotification里会根据解析到的包是否被确认bDelivered来分别执行确认接收ReceiveAck还是确认丢失ReceiveNak:
template<class Functor>
FNetPacketNotify::SequenceNumberT::DifferenceT FNetPacketNotify::Update(const FNotificationHeader& NotificationData, Functor&& InFunc)
{
const SequenceNumberT::DifferenceT InSeqDelta = GetSequenceDelta(NotificationData);
if (InSeqDelta > 0)
{
UE_LOG_PACKET_NOTIFY(TEXT("FNetPacketNotify::Update - Seq %u, InSeq %u"), NotificationData.Seq.Get(), InSeq.Get());
ProcessReceivedAcks(NotificationData, InFunc);
// accept sequence
InSeq = NotificationData.Seq;
return InSeqDelta;
}
else
{
return 0;
}
}
在FNetPacketNotify::Update就是真正的执行遍历传入的包接收状态数组的地方,在这个函数里首先根据传入的包确认序列号NotificationData.AckedSeq与之前记录的已经被确认收到的序列号OutAckSeq做比较,生成的差值AckCount会首先被UpdateInAckSeqAck用来更新InAckSeqAck为NotificationData.AckedSeq,这个UpdateInAckSeqAck其实就是从AckRecord数组里找到NotificationData.AckedSeq这个序号的包发送时顺带携带的ACK序列号,读者可以回顾一下UpdateInAckSeqAck的部分:
template<class Functor>
void FNetPacketNotify::ProcessReceivedAcks(const FNotificationHeader& NotificationData, Functor&& InFunc)
{
if (NotificationData.AckedSeq > OutAckSeq)
{
UE_LOG_PACKET_NOTIFY(TEXT("Notification::ProcessReceivedAcks - AckedSeq: %u, OutAckSeq: %u"), NotificationData.AckedSeq.Get(), OutAckSeq.Get());
SequenceNumberT::DifferenceT AckCount = SequenceNumberT::Diff(NotificationData.AckedSeq, OutAckSeq);
// Update InAckSeqAck used to track the needed number of bits to transmit our ack history
InAckSeqAck = UpdateInAckSeqAck(AckCount, NotificationData.AckedSeq);
// ExpectedAck = OutAckSeq + 1
SequenceNumberT CurrentAck(OutAckSeq);
++CurrentAck;
// Warn if the received sequence number is greater than our history buffer, since if that is the case we have to treat the data as lost.
if (AckCount > (SequenceNumberT::DifferenceT)(SequenceHistoryT::Size))
{
UE_LOG_PACKET_NOTIFY_WARNING(TEXT("Notification::ProcessReceivedAcks - Missed Acks: AckedSeq: %u, OutAckSeq: %u, FirstMissingSeq: %u Count: %u"), NotificationData.AckedSeq.Get(), OutAckSeq.Get(), CurrentAck.Get(), AckCount - (SequenceNumberT::DifferenceT)(SequenceHistoryT::Size));
}
// Everything not found in the history buffer is treated as lost
while (AckCount > (SequenceNumberT::DifferenceT)(SequenceHistoryT::Size))
{
--AckCount;
InFunc(CurrentAck, false);
++CurrentAck;
}
// For sequence numbers contained in the history we lookup the delivery status from the history
while (AckCount > 0)
{
--AckCount;
UE_LOG_PACKET_NOTIFY(TEXT("Notification::ProcessReceivedAcks Seq: %u - IsAck: %u HistoryIndex: %u"), CurrentAck.Get(), NotificationData.History.IsDelivered(AckCount) ? 1u : 0u, AckCount);
InFunc(CurrentAck, NotificationData.History.IsDelivered(AckCount));
++CurrentAck;
}
OutAckSeq = NotificationData.AckedSeq;
}
}
接下来会从传入的历史包接收状态数组里NotificationData.History获取之前的多个包的接收状态,由于NotificationData.History这个数组最大只能容纳256个元素,所以对于太老的无法在NotificationData.History里记录的包,就直接认为已经丢失,因此这里会执行InFunc(CurrentAck, false),这里的false就代表这个包丢失了。接下来就可以使用IsDelivered(AckCount)来获取CurrentAck对应包在NotificationData.History里记录的接收状态,并执行对应的InFunc。
对于汇报过来的丢失的Packet, 会遍历所有的Channel找到这个Packet里包含的所有Bunch,触发重新发送,此时SendRawBunch发送出去的Bunch会分配一个新的PacketId,不会使用之前的老PacketId:
//接收到另外一端反馈的丢包packetId
void UNetConnection::ReceivedNak( int32 NakPacketId )
{
auto NakChannelFunc = [this](int32 NackedPacketId, uint32 ChannelIndex)
{
UChannel* const Channel = Channels[ChannelIndex];
if (Channel)
{
Channel->ReceivedNak(NackedPacketId);
if (Channel->OpenPacketId.InRange(NackedPacketId))
{
Channel->ReceivedAcks(); //warning: May destroy Channel.
}
}
};
// Invoke NakChannelFunc on all channels written for this PacketId
FChannelRecordImpl::ConsumeChannelRecordsForPacket(ChannelRecord, NakPacketId, NakChannelFunc);
}
void UChannel::ReceivedNak( int32 NakPacketId )
{
// 触发重传
for( FOutBunch* Out=OutRec; Out; Out=Out->Next )
{
// Retransmit reliable bunches in the lost packet.
if( Out->PacketId==NakPacketId && !Out->ReceivedAck )
{
check(Out->bReliable);
UE_LOG(LogNetTraffic, Log, TEXT(" Channel %i nak); resending %i..."), Out->ChIndex, Out->ChSequence );
Connection->SendRawBunch( *Out, 0 );
}
}
}
这里的OutRec就是之前在发送可靠Bunch的时候维护的可靠Bunch有序列表,这个列表会按照ChSequence的递增序号排列。注意这里的ChSequence的类型是int32,所以不需要考虑序列号的回环问题。但是在组包的时候为了降低要发送的数据流量,这里会只打包这个序列号对1024的取模结果,也就是说只写入这个int32的低10bit的数据:
enum { MAX_CHSEQUENCE = 1024 }; // Power of 2 >RELIABLE_BUFFER, covering loss/misorder time.
void FBitWriter::WriteIntWrapped(uint32 Value, uint32 ValueMax)
{
// 直接按 LengthBits 写入 Value 的低位,不做 Value < ValueMax 的断言/修正
const int32 LengthBits = FMath::CeilLogTwo(ValueMax);
if (AllowAppend(LengthBits))
{
uint32 NewValue = 0;
for (uint32 Mask=1; NewValue+Mask < ValueMax && Mask; Mask*=2, Num++)
{
if (Value & Mask)
{
Buffer[Num>>3] += GShift[Num&7];
NewValue += Mask;
}
}
}
}
int32 UNetConnection::SendRawBunch(FOutBunch& Bunch, bool InAllowMerge, const FNetTraceCollector* BunchCollector)
{
// 省略很多代码
if (Bunch.bReliable && !IsInternalAck())
{
SendBunchHeader.WriteIntWrapped(Bunch.ChSequence, MAX_CHSEQUENCE);
}
}
对应的在接收到这个可靠Bunch的时候,会读取写入的10bit整数作为当前ChSequence的低10bit,然后根据当前已经顺序接收到的最大序列号InReliable[Bunch.ChIndex]来计算这个Bunch的ChSequence的完整32bit:
// Return the value of Max/2 <= Value-Reference+some_integer*Max < Max/2.
inline int32 BestSignedDifference( int32 Value, int32 Reference, int32 Max )
{
return ((Value-Reference+Max/2) & (Max-1)) - Max/2;
}
inline int32 MakeRelative( int32 Value, int32 Reference, int32 Max )
{
return Reference + BestSignedDifference(Value,Reference,Max);
}
void UNetConnection::ReceivedPacket( FBitReader& Reader, bool bIsReinjectedPacket)
{
// 省略很多代码
if ( Bunch.bReliable )
{
if ( IsInternalAck() )
{
// We can derive the sequence for 100% reliable connections
Bunch.ChSequence = InReliable[Bunch.ChIndex] + 1;
}
else
{
// If this is a reliable bunch, use the last processed reliable sequence to read the new reliable sequence
Bunch.ChSequence = MakeRelative( Reader.ReadInt( MAX_CHSEQUENCE ), InReliable[Bunch.ChIndex], MAX_CHSEQUENCE );
}
}
}
对于确认接收到的Packet,会执行UNetConnection::ReceivedAck,这里会遍历所有的Channel,获取这个Packet里对应的所有Bunch,并将Bunch.ReceivedAck设置为1:
void UNetConnection::ReceivedAck(int32 AckPacketId, FChannelsToClose& OutChannelsToClose)
{
UE_LOG(LogNetTraffic, Verbose, TEXT(" Received ack %i"), AckPacketId);
auto AckChannelFunc = [this, &OutChannelsToClose](int32 AckedPacketId, uint32 ChannelIndex)
{
UChannel* const Channel = Channels[ChannelIndex];
if (Channel)
{
if (Channel->OpenPacketId.Last == AckedPacketId) // Necessary for unreliable "bNetTemporary" channels.
{
Channel->OpenAcked = 1;
}
for (FOutBunch* OutBunch = Channel->OutRec; OutBunch; OutBunch = OutBunch->Next)
{
if (OutBunch->PacketId == AckedPacketId)
{
OutBunch->ReceivedAck = 1;
}
}
Channel->ReceivedAck(AckedPacketId);
EChannelCloseReason CloseReason;
if (Channel->ReceivedAcks(CloseReason))
{
const FChannelCloseInfo Info = {ChannelIndex, CloseReason};
OutChannelsToClose.Emplace(Info);
}
}
};
// Invoke AckChannelFunc on all channels written for this PacketId
FChannelRecordImpl::ConsumeChannelRecordsForPacket(ChannelRecord, AckPacketId, AckChannelFunc);
}
void UChannel::ReceivedAck( int32 AckPacketId )
{
// Do nothing. Most channels deal with this in Tick().
}
但是这里的ReceivedAck其实什么都没做,利用这个ReceivedAck字段的地方在后面调用的UChannel::ReceivedAcks里,这个函数会遍历这个OutRec链表里开头的所有已经标记为ReceivedAck==1的Bunch,执行delete来释放这个Bunch的内存,因为这个Bunch的数据已经被确认接收了,不会再触发重传,可以释放了:
bool UChannel::ReceivedAcks(EChannelCloseReason& OutCloseReason)
{
check(Connection->Channels[ChIndex]==this);
/*
// Verify in sequence.
for( FOutBunch* Out=OutRec; Out && Out->Next; Out=Out->Next )
check(Out->Next->ChSequence>Out->ChSequence);
*/
// Release all acknowledged outgoing queued bunches.
bool bCleanup = false;
EChannelCloseReason CloseReason = EChannelCloseReason::Destroyed;
while( OutRec && OutRec->ReceivedAck )
{
if (OutRec->bOpen)
{
// 忽略openbunch的处理
}
bCleanup = bCleanup || !!OutRec->bClose;
if (OutRec->bClose)
{
CloseReason = OutRec->CloseReason;
}
FOutBunch* Release = OutRec;
OutRec = OutRec->Next;
delete Release;
NumOutRec--;
}
// 忽略一些代码
return false;
}
前面的内容都是在处理Packet里携带的ACK, 处理完ACK之后还需要处理Packet里的所有Bunch数据。之前在消息接收小结中介绍了Bunch接收逻辑在UChannel::ReceivedRawBunch里。由于之前我们已经介绍过顺序接收到Bunch的处理细节,因此这里只需要关心可靠Bunch的乱序接收处理逻辑:
void UChannel::ReceivedRawBunch( FInBunch & Bunch, bool & bOutSkipAck )
{
// 省略一些代码
if ( Bunch.bReliable && Bunch.ChSequence != Connection->InReliable[ChIndex] + 1 )
{
// We shouldn't hit this path on 100% reliable connections
check( !Connection->IsInternalAck() );
// If this bunch has a dependency on a previous unreceived bunch, buffer it.
checkSlow(!Bunch.bOpen);
// Verify that UConnection::ReceivedPacket has passed us a valid bunch.
check(Bunch.ChSequence>Connection->InReliable[ChIndex]);
// Find the place for this item, sorted in sequence.
UE_LOG(LogNetTraffic, Log, TEXT(" Queuing bunch with unreceived dependency: %d / %d"), Bunch.ChSequence, Connection->InReliable[ChIndex]+1 );
FInBunch** InPtr;
for( InPtr=&InRec; *InPtr; InPtr=&(*InPtr)->Next )
{
if( Bunch.ChSequence==(*InPtr)->ChSequence )
{
// Already queued.
return;
}
else if( Bunch.ChSequence<(*InPtr)->ChSequence )
{
// Stick before this one.
break;
}
}
FInBunch* New = new FInBunch(Bunch);
New->Next = *InPtr;
*InPtr = New;
NumInRec++;
if ( NumInRec >= RELIABLE_BUFFER )
{
Bunch.SetError();
UE_LOG( LogNetTraffic, Error, TEXT( "UChannel::ReceivedRawBunch: Too many reliable messages queued up" ) );
return;
}
checkSlow(NumInRec<=RELIABLE_BUFFER);
//AssertInSequenced();
}
// 省略一些代码
}
这里的逻辑其实就是从InRec这个有序Bunch接收列表里插入当前的Bunch,使得InRec这个列表里的Bunch都按照ChSequence字段上升序排列。这里InRec里最多缓存255个Bunch数据,超过了这个限制则会报错。
后面如果接收到了一个可以处理的可靠Bunch,也就是Bunch.ChSequence == Connection->InReliable[ChIndex] + 1,那么在处理这个新Bunch的时候,会在ReceivedNextBunch更新Connection->InReliable[ChIndex]++,然后再使用while循环来遍历InRec这个缓存链表里所有与Connection->InReliable[ChIndex]连号的数据:
void UChannel::ReceivedRawBunch( FInBunch & Bunch, bool & bOutSkipAck )
{
// 省略一些代码
if ( Bunch.bReliable && Bunch.ChSequence != Connection->InReliable[ChIndex] + 1 )
{
// 省略一些代码
}
else
{
bool bDeleted = ReceivedNextBunch( Bunch, bOutSkipAck );
if ( Bunch.IsError() )
{
UE_LOG( LogNetTraffic, Error, TEXT( "UChannel::ReceivedRawBunch: Bunch.IsError() after ReceivedNextBunch 1" ) );
return;
}
if (bDeleted)
{
return;
}
// Dispatch any waiting bunches.
while( InRec )
{
// We shouldn't hit this path on 100% reliable connections
check( !Connection->IsInternalAck() );
if( InRec->ChSequence!=Connection->InReliable[ChIndex]+1 )
break;
UE_LOG(LogNetTraffic, Log, TEXT(" Channel %d Unleashing queued bunch"), ChIndex );
FInBunch* Release = InRec;
InRec = InRec->Next;
NumInRec--;
// Just keep a local copy of the bSkipAck flag, since these have already been acked and it doesn't make sense on this context
// Definitely want to warn when this happens, since it's really not possible
bool bLocalSkipAck = false;
bDeleted = ReceivedNextBunch( *Release, bLocalSkipAck );
if ( bLocalSkipAck )
{
UE_LOG( LogNetTraffic, Warning, TEXT( "UChannel::ReceivedRawBunch: bLocalSkipAck == true for already acked packet" ) );
}
if ( Bunch.IsError() )
{
UE_LOG( LogNetTraffic, Error, TEXT( "UChannel::ReceivedRawBunch: Bunch.IsError() after ReceivedNextBunch 2" ) );
return;
}
delete Release;
if (bDeleted)
{
return;
}
//AssertInSequenced();
}
}
}
遍历这个InRec链表的时候,如果发现是下一个期望处理的可靠Bunch,则使用ReceivedNextBunch来处理并更新Connection->InReliable[ChIndex]。如果发现连号中断,也就是InRec->ChSequence!=Connection->InReliable[ChIndex]+1,那么就可以中止遍历了。
流量控制
由于OutRec链表的最大大小被限制为了RELIABLE_BUFFER,所以在UChannel::SendBunch( FOutBunch* Bunch, bool Merge )发送数据执行完拆包之后,会检查这个OutRec链表是否能继续发送当前的可靠包,如果会超出RELIABLE_BUFFER的限制,则会导致连接中断:
enum { RELIABLE_BUFFER = 256 }; // Power of 2 >= 1.
FPacketIdRange UChannel::SendBunch( FOutBunch* Bunch, bool Merge )
{
// 前面的代码负责将Bunch执行可能的拆包操作,并最终放到OutgoingBunches数组里
FPacketIdRange PacketIdRange;
const bool bOverflowsReliable = (NumOutRec + OutgoingBunches.Num() >= RELIABLE_BUFFER + Bunch->bClose);
if ((GCVarNetPartialBunchReliableThreshold > 0) && (OutgoingBunches.Num() >= GCVarNetPartialBunchReliableThreshold) && !Connection->IsInternalAck())
{
if (!bOverflowsReliable)
{
UE_LOG(LogNetPartialBunch, Log, TEXT(" OutgoingBunches.Num (%d) exceeds reliable threashold (%d). Making bunches reliable. Property replication will be paused on this channel until these are ACK'd."), OutgoingBunches.Num(), GCVarNetPartialBunchReliableThreshold);
Bunch->bReliable = true;
bPausedUntilReliableACK = true;
}
else
{
// The threshold was hit, but making these reliable would overflow the reliable buffer. This is a problem: there is just too much data.
UE_LOG(LogNetPartialBunch, Warning, TEXT(" OutgoingBunches.Num (%d) exceeds reliable threashold (%d) but this would overflow the reliable buffer! Consider sending less stuff. Channel: %s"), OutgoingBunches.Num(), GCVarNetPartialBunchReliableThreshold, *Describe());
}
}
if (Bunch->bReliable && bOverflowsReliable)
{
UE_LOG(LogNetPartialBunch, Warning, TEXT("SendBunch: Reliable partial bunch overflows reliable buffer! %s"), *Describe() );
UE_LOG(LogNetPartialBunch, Warning, TEXT(" Num OutgoingBunches: %d. NumOutRec: %d"), OutgoingBunches.Num(), NumOutRec );
PrintReliableBunchBuffer();
// Bail out, we can't recover from this (without increasing RELIABLE_BUFFER)
FString ErrorMsg = NSLOCTEXT("NetworkErrors", "ClientReliableBufferOverflow", "Outgoing reliable buffer overflow").ToString();
FNetControlMessage<NMT_Failure>::Send(Connection, ErrorMsg);
Connection->FlushNet(true);
Connection->Close();
return PacketIdRange;
}
// 后面的代码负责执行这些拆分好的Bunch的发送SendRawBunch
}
这里有一段比较特殊的逻辑,就是如果当前要发送的Bunch是非可靠Bunch且切分之后的PartialBunch数量超过了配置的GCVarNetPartialBunchReliableThreshold,就会将这些非可靠Bunch转化为可靠Bunch,当然这个的前提是变成可靠Bunch之后不会导致OutRec链表超出RELIABLE_BUFFER的限制。这里还会设置bPausedUntilReliableACK = true,这个字段的作用是暂停所有Actor属性复制的Diff,直到接收到所有发送出去的可靠包的ACK:
uint32 bPausedUntilReliableACK:1; // Unreliable property replication is paused until all reliables are ack'd.
int64 UActorChannel::ReplicateActor()
{
SCOPE_CYCLE_COUNTER(STAT_NetReplicateActorTime);
// 省略很多的代码
if (bPausedUntilReliableACK)
{
if (NumOutRec > 0)
{
return 0;
}
bPausedUntilReliableACK = 0;
UE_LOG(LogNet, Verbose, TEXT("ReplicateActor: bPausedUntilReliableACK is ending now that reliables have been ACK'd. %s"), *Describe());
}
// 省略很多的代码
}
所以UE里的流量控制是基于可靠Bunch的,非可靠Bunch只是简单的放到OutgoingBunches数组里等待发送,不会影响到可靠Bunch的流量控制。但是一旦发现可靠Bunch的未ACK数量太多,则会通过中止Actor的属性同步的方式来降低非可靠Bunch的发送频率。因为可靠Bunch是不能丢弃的,而属性同步是基于Diff的,停止一段时间属性同步之后再重新开始Diff不会影响客户端的属性正确性。