Просмотр исходного кода

把消息的序列化跟反序列化提到Entity这一层来,可以做各种优化,比如消息发到gate转发到客户端,
gate可以不用反序列化出消息,直接把MemoryStream转发给客户端

tanghai 2 лет назад
Родитель
Сommit
7d37d33dfb

+ 4 - 0
DotNet/Core/DotNet.Core.csproj

@@ -39,5 +39,9 @@
       <ProjectReference Include="..\..\Share\Share.SourceGenerator\Share.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
       <ProjectReference Include="..\..\Share\Share.SourceGenerator\Share.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
       <ProjectReference Include="..\ThirdParty\DotNet.ThirdParty.csproj" />
       <ProjectReference Include="..\ThirdParty\DotNet.ThirdParty.csproj" />
     </ItemGroup>
     </ItemGroup>
+    
+    <ItemGroup>
+      <Folder Include="Core\Serialize\" />
+    </ItemGroup>
 
 
 </Project>
 </Project>

+ 2 - 2
Unity/Assets/Scripts/Core/Network/AService.cs

@@ -8,7 +8,7 @@ namespace ET
     public abstract class AService: IDisposable
     public abstract class AService: IDisposable
     {
     {
         public Action<long, IPEndPoint> AcceptCallback;
         public Action<long, IPEndPoint> AcceptCallback;
-        public Action<long, ActorId, object> ReadCallback;
+        public Action<long, MemoryBuffer> ReadCallback;
         public Action<long, int> ErrorCallback;
         public Action<long, int> ErrorCallback;
         
         
         public long Id { get; set; }
         public long Id { get; set; }
@@ -70,7 +70,7 @@ namespace ET
 
 
         public abstract void Create(long id, string address);
         public abstract void Create(long id, string address);
 
 
-        public abstract void Send(long channelId, ActorId actorId, MessageObject message);
+        public abstract void Send(long channelId, MemoryBuffer memoryBuffer);
 
 
         public virtual (uint, uint) GetChannelConn(long channelId)
         public virtual (uint, uint) GetChannelConn(long channelId)
         {
         {

+ 10 - 54
Unity/Assets/Scripts/Core/Network/KChannel.cs

@@ -3,7 +3,6 @@ using System.Collections.Generic;
 using System.IO;
 using System.IO;
 using System.Net;
 using System.Net;
 using System.Net.Sockets;
 using System.Net.Sockets;
-using System.Runtime.InteropServices;
 
 
 namespace ET
 namespace ET
 {
 {
@@ -15,7 +14,7 @@ namespace ET
 
 
 		private Kcp kcp { get; set; }
 		private Kcp kcp { get; set; }
 
 
-		private readonly Queue<MessageInfo> waitSendMessages = new();
+		private readonly Queue<MemoryBuffer> waitSendMessages = new();
 		
 		
 		public readonly uint CreateTime;
 		public readonly uint CreateTime;
 
 
@@ -163,8 +162,8 @@ namespace ET
 					break;
 					break;
 				}
 				}
 				
 				
-				MessageInfo buffer = this.waitSendMessages.Dequeue();
-				this.Send(buffer.ActorId, buffer.MessageObject);
+				MemoryBuffer buffer = this.waitSendMessages.Dequeue();
+				this.Send(buffer);
 			}
 			}
 		}
 		}
 
 
@@ -250,7 +249,7 @@ namespace ET
 			this.Service.AddToUpdate(nextUpdateTime, this.Id);
 			this.Service.AddToUpdate(nextUpdateTime, this.Id);
 		}
 		}
 
 
-		public unsafe void HandleRecv(byte[] date, int offset, int length)
+		public void HandleRecv(byte[] date, int offset, int length)
 		{
 		{
 			if (this.IsDisposed)
 			if (this.IsDisposed)
 			{
 			{
@@ -384,27 +383,13 @@ namespace ET
 			}
 			}
 		}
 		}
 
 
-        private void KcpSend(ActorId actorId, MemoryBuffer memoryStream)
+        private void KcpSend(MemoryBuffer memoryStream)
 		{
 		{
 			if (this.IsDisposed)
 			if (this.IsDisposed)
 			{
 			{
 				return;
 				return;
 			}
 			}
 			
 			
-			switch (this.Service.ServiceType)
-			{
-				case ServiceType.Inner:
-				{
-					memoryStream.GetBuffer().WriteTo(0, actorId);
-					break;
-				}
-				case ServiceType.Outer:
-				{
-					// 外网不需要发送actorId,跳过
-					memoryStream.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
-					break;
-				}
-			}
 			int count = (int) (memoryStream.Length - memoryStream.Position);
 			int count = (int) (memoryStream.Length - memoryStream.Position);
 
 
 			// 超出maxPacketSize需要分片
 			// 超出maxPacketSize需要分片
@@ -437,19 +422,14 @@ namespace ET
 			this.Service.AddToUpdate(0, this.Id);
 			this.Service.AddToUpdate(0, this.Id);
 		}
 		}
 		
 		
-		public void Send(ActorId actorId, MessageObject message)
+		public void Send(MemoryBuffer memoryBuffer)
 		{
 		{
 			if (!this.IsConnected)
 			if (!this.IsConnected)
 			{
 			{
-				MessageInfo messageInfo = new() { ActorId = actorId, MessageObject = message };
-				this.waitSendMessages.Enqueue(messageInfo);
+				this.waitSendMessages.Enqueue(memoryBuffer);
 				return;
 				return;
 			}
 			}
 
 
-			MemoryBuffer stream = this.Service.Fetch();
-			MessageSerializeHelper.MessageToStream(stream, message);
-			message.Dispose();
-
 			if (this.kcp == null)
 			if (this.kcp == null)
 			{
 			{
 				throw new Exception("kchannel connected but kcp is zero!");
 				throw new Exception("kchannel connected but kcp is zero!");
@@ -477,40 +457,16 @@ namespace ET
 				return;
 				return;
 			}
 			}
 
 
-			this.KcpSend(actorId, stream);
+			this.KcpSend(memoryBuffer);
 			
 			
-			this.Service.Recycle(stream);
+			this.Service.Recycle(memoryBuffer);
 		}
 		}
 		
 		
 		private void OnRead(MemoryBuffer memoryStream)
 		private void OnRead(MemoryBuffer memoryStream)
 		{
 		{
 			try
 			try
 			{
 			{
-				long channelId = this.Id;
-				object message = null;
-				ActorId actorId = default;
-				switch (this.Service.ServiceType)
-				{
-					case ServiceType.Outer:
-					{
-						ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
-						Type type = OpcodeType.Instance.GetType(opcode);
-						message = MessageSerializeHelper.Deserialize(type, memoryStream);
-						break;
-					}
-					case ServiceType.Inner:
-					{
-						byte[] buffer = memoryStream.GetBuffer();
-						actorId.Process = BitConverter.ToInt32(buffer, Packet.ActorIdIndex);
-						actorId.Fiber = BitConverter.ToInt32(buffer, Packet.ActorIdIndex + 4);
-						actorId.InstanceId = BitConverter.ToInt64(buffer, Packet.ActorIdIndex + 8);
-						ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.OpcodeIndex);
-						Type type = OpcodeType.Instance.GetType(opcode);
-						message = MessageSerializeHelper.Deserialize(type, memoryStream);
-						break;
-					}
-				}
-				this.Service.ReadCallback(channelId, actorId, message);
+				this.Service.ReadCallback(this.Id, memoryStream);
 			}
 			}
 			catch (Exception e)
 			catch (Exception e)
 			{
 			{

+ 2 - 2
Unity/Assets/Scripts/Core/Network/KService.cs

@@ -468,7 +468,7 @@ namespace ET
             Log.Info($"channel send fin: {localConn} {remoteConn} {address} {error}");
             Log.Info($"channel send fin: {localConn} {remoteConn} {address} {error}");
         }
         }
         
         
-        public override void Send(long channelId, ActorId actorId, MessageObject message)
+        public override void Send(long channelId, MemoryBuffer memoryBuffer)
         {
         {
             KChannel channel = this.Get(channelId);
             KChannel channel = this.Get(channelId);
             if (channel == null)
             if (channel == null)
@@ -476,7 +476,7 @@ namespace ET
                 return;
                 return;
             }
             }
             
             
-            channel.Send(actorId, message);
+            channel.Send(memoryBuffer);
         }
         }
 
 
         public override void Update()
         public override void Update()

+ 56 - 3
Unity/Assets/Scripts/Core/Network/MessageSerializeHelper.cs

@@ -29,10 +29,8 @@ namespace ET
             return o as MessageObject;
             return o as MessageObject;
         }
         }
         
         
-        public static ushort MessageToStream(MemoryBuffer stream, MessageObject message)
+        public static ushort MessageToStream(MemoryBuffer stream, MessageObject message, int headOffset = 0)
         {
         {
-            int headOffset = Packet.ActorIdLength;
-
             ushort opcode = OpcodeType.Instance.GetOpcode(message.GetType());
             ushort opcode = OpcodeType.Instance.GetOpcode(message.GetType());
             
             
             stream.Seek(headOffset + Packet.OpcodeLength, SeekOrigin.Begin);
             stream.Seek(headOffset + Packet.OpcodeLength, SeekOrigin.Begin);
@@ -45,5 +43,60 @@ namespace ET
             stream.Seek(0, SeekOrigin.Begin);
             stream.Seek(0, SeekOrigin.Begin);
             return opcode;
             return opcode;
         }
         }
+        
+        public static (ushort, MemoryBuffer) ToMemoryBuffer(AService service, ActorId actorId, object message)
+        {
+            MemoryBuffer memoryBuffer = service.Fetch();
+            ushort opcode = 0;
+            switch (service.ServiceType)
+            {
+                case ServiceType.Inner:
+                {
+                    opcode = MessageToStream(memoryBuffer, (MessageObject)message, Packet.ActorIdLength);
+                    memoryBuffer.GetBuffer().WriteTo(0, actorId);
+                    break;
+                }
+                case ServiceType.Outer:
+                {
+                    opcode = MessageToStream(memoryBuffer, (MessageObject)message);
+                    break;
+                }
+            }
+            
+            ((MessageObject)message).Dispose(); // 回收message
+            
+            return (opcode, memoryBuffer);
+        }
+        
+        public static (ActorId, object) ToMessage(AService service, MemoryBuffer memoryStream)
+        {
+            object message = null;
+            ActorId actorId = default;
+            switch (service.ServiceType)
+            {
+                case ServiceType.Outer:
+                {
+                    ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
+                    Type type = OpcodeType.Instance.GetType(opcode);
+                    message = Deserialize(type, memoryStream);
+                    break;
+                }
+                case ServiceType.Inner:
+                {
+                    byte[] buffer = memoryStream.GetBuffer();
+                    actorId.Process = BitConverter.ToInt32(buffer, Packet.ActorIdIndex);
+                    actorId.Fiber = BitConverter.ToInt32(buffer, Packet.ActorIdIndex + 4);
+                    actorId.InstanceId = BitConverter.ToInt64(buffer, Packet.ActorIdIndex + 8);
+                    ushort opcode = BitConverter.ToUInt16(buffer, Packet.OpcodeIndex);
+                    Type type = OpcodeType.Instance.GetType(opcode);
+                    message = Deserialize(type, memoryStream);
+                    break;
+                }
+            }
+            
+            service.Recycle(memoryStream);
+            
+            return (actorId, message);
+        }
     }
     }
 }
 }

+ 5 - 63
Unity/Assets/Scripts/Core/Network/TChannel.cs

@@ -92,47 +92,14 @@ namespace ET
 			this.socket = null;
 			this.socket = null;
 		}
 		}
 
 
-		public void Send(ActorId actorId, MessageObject message)
+		public void Send(MemoryBuffer stream)
 		{
 		{
 			if (this.IsDisposed)
 			if (this.IsDisposed)
 			{
 			{
 				throw new Exception("TChannel已经被Dispose, 不能发送消息");
 				throw new Exception("TChannel已经被Dispose, 不能发送消息");
 			}
 			}
-
-			MemoryBuffer stream = this.Service.Fetch();
-			MessageSerializeHelper.MessageToStream(stream, message);
-			message.Dispose();
-
-			switch (this.Service.ServiceType)
-			{
-				case ServiceType.Inner:
-				{
-					int messageSize = (int) (stream.Length - stream.Position);
-					if (messageSize > ushort.MaxValue * 16)
-					{
-						throw new Exception($"send packet too large: {stream.Length} {stream.Position}");
-					}
-
-					this.sendCache.WriteTo(0, messageSize);
-					this.sendBuffer.Write(this.sendCache, 0, PacketParser.InnerPacketSizeLength);
-
-					// actorId
-					stream.GetBuffer().WriteTo(0, actorId);
-					this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position));
-					break;
-				}
-				case ServiceType.Outer:
-				{
-					stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin); // 外网不需要actorId
-					ushort messageSize = (ushort) (stream.Length - stream.Position);
-
-					this.sendCache.WriteTo(0, messageSize);
-					this.sendBuffer.Write(this.sendCache, 0, PacketParser.OuterPacketSizeLength);
-					
-					this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position));
-					break;
-				}
-			}
+			
+			this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position));
 			
 			
 			if (!this.isSending)
 			if (!this.isSending)
 			{
 			{
@@ -366,36 +333,11 @@ namespace ET
 		{
 		{
 			try
 			try
 			{
 			{
-				long channelId = this.Id;
-				object message = null;
-				ActorId actorId = default;
-				switch (this.Service.ServiceType)
-				{
-					case ServiceType.Outer:
-					{
-						ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
-						Type type = OpcodeType.Instance.GetType(opcode);
-						message = MessageSerializeHelper.Deserialize(type, memoryStream);
-						break;
-					}
-					case ServiceType.Inner:
-					{
-						byte[] buffer = memoryStream.GetBuffer();
-						actorId.Process = BitConverter.ToInt32(buffer, Packet.ActorIdIndex);
-						actorId.Fiber = BitConverter.ToInt32(buffer, Packet.ActorIdIndex + 4);
-						actorId.InstanceId = BitConverter.ToInt64(buffer, Packet.ActorIdIndex + 8);
-						ushort opcode = BitConverter.ToUInt16(buffer, Packet.OpcodeIndex);
-						Type type = OpcodeType.Instance.GetType(opcode);
-						message = MessageSerializeHelper.Deserialize(type, memoryStream);
-						break;
-					}
-				}
-				this.Service.ReadCallback(channelId, actorId, message);
+				this.Service.ReadCallback(this.Id, memoryStream);
 			}
 			}
 			catch (Exception e)
 			catch (Exception e)
 			{
 			{
-				Log.Error($"{this.RemoteAddress} {memoryStream.Length} {e}");
-				// 出现任何消息解析异常都要断开Session,防止客户端伪造消息
+				Log.Error(e);
 				this.OnError(ErrorCore.ERR_PacketParserError);
 				this.OnError(ErrorCore.ERR_PacketParserError);
 			}
 			}
 		}
 		}

+ 2 - 2
Unity/Assets/Scripts/Core/Network/TService.cs

@@ -157,7 +157,7 @@ namespace ET
 			this.idChannels.Remove(id);
 			this.idChannels.Remove(id);
 		}
 		}
 
 
-		public override void Send(long channelId, ActorId actorId, MessageObject message)
+		public override void Send(long channelId, MemoryBuffer memoryBuffer)
 		{
 		{
 			try
 			try
 			{
 			{
@@ -168,7 +168,7 @@ namespace ET
 					return;
 					return;
 				}
 				}
 				
 				
-				aChannel.Send(actorId, message);
+				aChannel.Send(memoryBuffer);
 			}
 			}
 			catch (Exception e)
 			catch (Exception e)
 			{
 			{

+ 6 - 33
Unity/Assets/Scripts/Core/Network/WChannel.cs

@@ -16,7 +16,7 @@ namespace ET
 
 
         private readonly WebSocket webSocket;
         private readonly WebSocket webSocket;
 
 
-        private readonly Queue<MessageObject> queue = new();
+        private readonly Queue<MemoryBuffer> queue = new();
 
 
         private bool isSending;
         private bool isSending;
 
 
@@ -86,9 +86,9 @@ namespace ET
             }
             }
         }
         }
 
 
-        public void Send(MessageObject message)
+        public void Send(MemoryBuffer memoryBuffer)
         {
         {
-            this.queue.Enqueue(message);
+            this.queue.Enqueue(memoryBuffer);
 
 
             if (this.isConnected)
             if (this.isConnected)
             {
             {
@@ -120,22 +120,8 @@ namespace ET
                         return;
                         return;
                     }
                     }
 
 
-                    MessageObject message = this.queue.Dequeue();
-
-                    MemoryBuffer stream = this.Service.Fetch();
-                    
-                    MessageSerializeHelper.MessageToStream(stream, message);
-                    message.Dispose();
+                    MemoryBuffer stream = this.queue.Dequeue();
             
             
-                    switch (this.Service.ServiceType)
-                    {
-                        case ServiceType.Inner:
-                            break;
-                        case ServiceType.Outer:
-                            stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
-                            break;
-                    }
-                    
                     try
                     try
                     {
                     {
                         await this.webSocket.SendAsync(stream.GetMemory(), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
                         await this.webSocket.SendAsync(stream.GetMemory(), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
@@ -227,24 +213,11 @@ namespace ET
         {
         {
             try
             try
             {
             {
-                long channelId = this.Id;
-                object message = null;
-                switch (this.Service.ServiceType)
-                {
-                    case ServiceType.Outer:
-                    {
-                        ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
-                        Type type = OpcodeType.Instance.GetType(opcode);
-                        message = MessageSerializeHelper.Deserialize(type, memoryStream);
-                        break;
-                    }
-                }
-                this.Service.ReadCallback(channelId, new ActorId(), message);
+                this.Service.ReadCallback(this.Id, memoryStream);
             }
             }
             catch (Exception e)
             catch (Exception e)
             {
             {
-                Log.Error($"{this.RemoteAddress} {memoryStream.Length} {e}");
-                // 出现任何消息解析异常都要断开Session,防止客户端伪造消息
+                Log.Error(e);
                 this.OnError(ErrorCore.ERR_PacketParserError);
                 this.OnError(ErrorCore.ERR_PacketParserError);
             }
             }
         }
         }

+ 2 - 2
Unity/Assets/Scripts/Core/Network/WService.cs

@@ -131,14 +131,14 @@ namespace ET
             }
             }
         }
         }
 
 
-        public override void Send(long channelId, ActorId actorId, MessageObject message)
+        public override void Send(long channelId, MemoryBuffer memoryBuffer)
         {
         {
             this.channels.TryGetValue(channelId, out WChannel channel);
             this.channels.TryGetValue(channelId, out WChannel channel);
             if (channel == null)
             if (channel == null)
             {
             {
                 return;
                 return;
             }
             }
-            channel.Send(message);
+            channel.Send(memoryBuffer);
         }
         }
     }
     }
 }
 }

+ 3 - 1
Unity/Assets/Scripts/Hotfix/Server/Module/Message/ProcessOuterSenderSystem.cs

@@ -42,7 +42,7 @@ namespace ET.Server
             self.AService.Dispose();
             self.AService.Dispose();
         }
         }
 
 
-        private static void OnRead(this ProcessOuterSender self, long channelId, ActorId actorId, object message)
+        private static void OnRead(this ProcessOuterSender self, long channelId, MemoryBuffer memoryBuffer)
         {
         {
             Session session = self.GetChild<Session>(channelId);
             Session session = self.GetChild<Session>(channelId);
             if (session == null)
             if (session == null)
@@ -51,6 +51,8 @@ namespace ET.Server
             }
             }
             
             
             session.LastRecvTime = TimeInfo.Instance.ClientFrameTime();
             session.LastRecvTime = TimeInfo.Instance.ClientFrameTime();
+
+            (ActorId actorId, object message) = MessageSerializeHelper.ToMessage(self.AService, memoryBuffer);
             
             
             if (message is IResponse response)
             if (message is IResponse response)
             {
             {

+ 3 - 1
Unity/Assets/Scripts/Hotfix/Share/Module/Message/NetComponentSystem.cs

@@ -63,7 +63,7 @@ namespace ET
             }
             }
         }
         }
         
         
-        private static void OnRead(this NetComponent self, long channelId, ActorId actorId, object message)
+        private static void OnRead(this NetComponent self, long channelId, MemoryBuffer memoryBuffer)
         {
         {
             Session session = self.GetChild<Session>(channelId);
             Session session = self.GetChild<Session>(channelId);
             if (session == null)
             if (session == null)
@@ -72,6 +72,8 @@ namespace ET
             }
             }
             session.LastRecvTime = TimeInfo.Instance.ClientNow();
             session.LastRecvTime = TimeInfo.Instance.ClientNow();
             
             
+            (ActorId _, object message) = MessageSerializeHelper.ToMessage(self.AService, memoryBuffer);
+            
             LogMsg.Instance.Debug(self.Fiber(), message);
             LogMsg.Instance.Debug(self.Fiber(), message);
             
             
             EventSystem.Instance.Invoke((long)self.IScene.SceneType, new NetComponentOnRead() {Session = session, Message = message});
             EventSystem.Instance.Invoke((long)self.IScene.SceneType, new NetComponentOnRead() {Session = session, Message = message});

+ 5 - 1
Unity/Assets/Scripts/Model/Share/Module/Message/Session.cs

@@ -1,5 +1,6 @@
 using System;
 using System;
 using System.Collections.Generic;
 using System.Collections.Generic;
+using System.IO;
 using System.Linq;
 using System.Linq;
 using System.Net;
 using System.Net;
 
 
@@ -135,7 +136,10 @@ namespace ET
         {
         {
             self.LastSendTime = TimeInfo.Instance.ClientNow();
             self.LastSendTime = TimeInfo.Instance.ClientNow();
             LogMsg.Instance.Debug(self.Fiber(), message);
             LogMsg.Instance.Debug(self.Fiber(), message);
-            self.AService.Send(self.Id, actorId, message as MessageObject);
+
+            (ushort opcode, MemoryBuffer memoryBuffer) = MessageSerializeHelper.ToMemoryBuffer(self.AService, actorId, message);
+            
+            self.AService.Send(self.Id, memoryBuffer);
         }
         }
     }
     }