Просмотр исходного кода

调整消息跟MemoryBuffer的回收代码

tanghai 2 лет назад
Родитель
Сommit
4b07c0e004

+ 17 - 21
Unity/Assets/Scripts/Core/Module/Network/AService.cs

@@ -12,35 +12,31 @@ namespace ET
         
         private (MessageObject Message, MemoryBuffer MemoryStream) lastMessageInfo;
         
-        // 缓存上一个发送的消息,这样广播消息的时候省掉多次序列化,这样有个另外的问题,客户端就不能保存发送的消息来减少gc
-        // 需要广播的消息不能用对象池,因为用了对象池无法使用ReferenceEquals去判断是否是同一个消息,服务端gc很强,这种消息不用池问题不大
+        // 缓存上一个发送的消息,这样广播消息的时候省掉多次序列化
         public MemoryBuffer Fetch(MessageObject message)
         {
-            MemoryBuffer stream;
-            if (!message.IsFromPool) // 不是从池中来的消息, 服务端需要广播的消息不能用对象池
+            if (ReferenceEquals(message, this.lastMessageInfo.Message))
             {
-                if (ReferenceEquals(message, this.lastMessageInfo.Message))
-                {
-                    Log.Debug($"message serialize cache: {message.GetType().FullName}");
-                    return lastMessageInfo.MemoryStream;
-                }
-
-                stream = new MemoryBuffer(); // 因为广播,可能MemoryBuffer会用多次,所以不能用对象池
-                MessageSerializeHelper.MessageToStream(stream, message);
-                this.lastMessageInfo = (message, stream);
-            }
-            else
-            {
-                stream = NetServices.Instance.Fetch();
-                MessageSerializeHelper.MessageToStream(stream, message);
-                NetServices.Instance.RecycleMessage(message);
+                Log.Debug($"message serialize cache: {message.GetType().FullName}");
+                return lastMessageInfo.MemoryStream;
             }
+
+            MemoryBuffer stream = NetServices.Instance.FetchMemoryBuffer();
+            MessageSerializeHelper.MessageToStream(stream, message);
+            this.lastMessageInfo = (message, stream);
             return stream;
         }
         
-        public void Recycle(MemoryBuffer memoryStream)
+        public void Recycle(MessageObject message, MemoryBuffer memoryBuffer)
         {
-            NetServices.Instance.Recycle(memoryStream);
+            if (ReferenceEquals(message, this.lastMessageInfo.Message))
+            {
+                return;
+            }
+            NetServices.Instance.RecycleMessage(this.lastMessageInfo.Message);
+            NetServices.Instance.RecycleMemoryBuffer(this.lastMessageInfo.MemoryStream);
+
+            this.lastMessageInfo = (message, memoryBuffer);
         }
         
         public virtual void Dispose()

+ 16 - 22
Unity/Assets/Scripts/Core/Module/Network/KChannel.cs

@@ -9,10 +9,10 @@ namespace ET
 {
 
 	
-	public struct KcpWaitPacket
+	public struct KcpWaitSendMessage
 	{
 		public long ActorId;
-		public MemoryBuffer MemoryStream;
+		public MessageObject Message;
 	}
 	
 	public class KChannel : AChannel
@@ -23,7 +23,7 @@ namespace ET
 
 		public IntPtr kcp { get; private set; }
 
-		private readonly Queue<KcpWaitPacket> sendBuffer = new Queue<KcpWaitPacket>();
+		private readonly Queue<KcpWaitSendMessage> waitSendMessages = new Queue<KcpWaitSendMessage>();
 		
 		public readonly uint CreateTime;
 
@@ -164,13 +164,13 @@ namespace ET
 			
 			while (true)
 			{
-				if (this.sendBuffer.Count <= 0)
+				if (this.waitSendMessages.Count <= 0)
 				{
 					break;
 				}
 				
-				KcpWaitPacket buffer = this.sendBuffer.Dequeue();
-				this.KcpSend(buffer);
+				KcpWaitSendMessage buffer = this.waitSendMessages.Dequeue();
+				this.Send(buffer.ActorId, buffer.Message);
 			}
 		}
 
@@ -391,19 +391,18 @@ namespace ET
 			}
 		}
 
-        private void KcpSend(KcpWaitPacket kcpWaitPacket)
+        private void KcpSend(long actorId, MemoryBuffer memoryStream)
 		{
 			if (this.IsDisposed)
 			{
 				return;
 			}
-			MemoryBuffer memoryStream = kcpWaitPacket.MemoryStream;
 			
 			switch (this.Service.ServiceType)
 			{
 				case ServiceType.Inner:
 				{
-					memoryStream.GetBuffer().WriteTo(0, kcpWaitPacket.ActorId);
+					memoryStream.GetBuffer().WriteTo(0, actorId);
 					break;
 				}
 				case ServiceType.Outer:
@@ -442,27 +441,19 @@ namespace ET
 			}
 
 			this.Service.AddToUpdate(0, this.Id);
-			
-			// 回收MemoryBuffer,减少GC
-			this.Service.Recycle(memoryStream);
 		}
 		
 		public void Send(long actorId, MessageObject message)
 		{
-			KcpWaitPacket kcpWaitPacket;
-			MemoryBuffer stream;
 			if (!this.IsConnected)
 			{
-				// 没连接成功的时候MemoryBuffer不用对象池,因为这个时候可能会堆积大量消息,造成池过大
-				stream = new MemoryBuffer();
-				MessageSerializeHelper.MessageToStream(stream, message);
-				kcpWaitPacket = new KcpWaitPacket { ActorId = actorId, MemoryStream = stream };
-				this.sendBuffer.Enqueue(kcpWaitPacket);
+				KcpWaitSendMessage kcpWaitSendMessage = new() { ActorId = actorId, Message = message };
+				this.waitSendMessages.Enqueue(kcpWaitSendMessage);
 				return;
 			}
 			
-			stream = this.Service.Fetch(message);
-			kcpWaitPacket = new KcpWaitPacket { ActorId = actorId, MemoryStream = stream };
+			MemoryBuffer memoryStream = this.Service.Fetch(message);
+
 			if (this.kcp == IntPtr.Zero)
 			{
 				throw new Exception("kchannel connected but kcp is zero!");
@@ -490,7 +481,10 @@ namespace ET
 				return;
 			}
 
-			this.KcpSend(kcpWaitPacket);
+			this.KcpSend(actorId, memoryStream);
+			
+			// 回收Message跟MemoryBuffer,减少GC
+			this.Service.Recycle(message, memoryStream);
 		}
 		
 		private void OnRead(MemoryBuffer memoryStream)

+ 7 - 0
Unity/Assets/Scripts/Core/Module/Network/MessagePool.cs

@@ -12,8 +12,10 @@ namespace ET
             return this.Fetch(typeof (T)) as T;
         }
 
+        // 只有客户端才用消息池,服务端不使用
         public MessageObject Fetch(Type type)
         {
+#if UNITY
             lock (this.pool)
             {
                 MessageObject messageObject;
@@ -34,6 +36,11 @@ namespace ET
                 messageObject.IsFromPool = true;
                 return messageObject;
             }
+#else
+            return Activator.CreateInstance(type) as MessageObject;
+#endif
+            
+
         }
 
         public void Recycle(MessageObject obj)

+ 2 - 2
Unity/Assets/Scripts/Core/Module/Network/NetServices.cs

@@ -287,7 +287,7 @@ namespace ET
 
         private readonly Queue<MemoryBuffer> pool = new();
 
-        public MemoryBuffer Fetch()
+        public MemoryBuffer FetchMemoryBuffer()
         {
             if (this.pool.Count > 0)
             {
@@ -298,7 +298,7 @@ namespace ET
             return memoryBuffer;
         }
 
-        public void Recycle(MemoryBuffer memoryBuffer)
+        public void RecycleMemoryBuffer(MemoryBuffer memoryBuffer)
         {
             if (!memoryBuffer.IsFromPool)
             {

+ 1 - 1
Unity/Assets/Scripts/Core/Module/Network/TChannel.cs

@@ -130,7 +130,7 @@ namespace ET
 				}
 			}
 			
-			this.Service.Recycle(stream);
+			this.Service.Recycle(message, stream);
 			
 			if (!this.isSending)
 			{

+ 0 - 2
Unity/Assets/Scripts/Core/Module/Network/WChannel.cs

@@ -136,8 +136,6 @@ namespace ET
                     {
                         await this.webSocket.SendAsync(bytes.GetMemory(), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
                         
-                        this.Service.Recycle(bytes);
-                        
                         if (this.IsDisposed)
                         {
                             return;

+ 1 - 0
Unity/Assets/Scripts/Hotfix/Client/Demo/Ping/PingComponentSystem.cs

@@ -33,6 +33,7 @@ namespace ET.Client
                 {
                     C2G_Ping c2GPing = NetServices.Instance.FetchMessage<C2G_Ping>();
                     G2C_Ping response = await session.Call(c2GPing) as G2C_Ping;
+                    NetServices.Instance.RecycleMessage(response);
 
                     if (self.InstanceId != instanceId)
                     {

+ 1 - 1
Unity/Assets/Scripts/Model/Share/LockStep/LSConstValue.cs

@@ -2,7 +2,7 @@ namespace ET
 {
     public static class LSConstValue
     {
-        public const int MatchCount = 2;
+        public const int MatchCount = 1;
         public const int UpdateInterval = 50;
         public const int FrameCountPerSecond = 1000 / UpdateInterval;
         public const int SaveLSWorldFrameCount = 60 * FrameCountPerSecond;