Просмотр исходного кода

1.消除发送消息GC
2.kcp收到远程断开消息,需要调用OnError删除相应的Session

tanghai 7 лет назад
Родитель
Сommit
0c5f0114c5

+ 4 - 4
Server/Hotfix/Module/Message/OuterMessageDispatcher.cs

@@ -11,9 +11,9 @@ namespace ETHotfix
 			object message;
 			try
 			{
-				Type messageType = session.Network.Entity.GetComponent<OpcodeTypeComponent>().GetType(packet.Opcode);
-				message = session.Network.MessagePacker.DeserializeFrom(messageType, packet.Stream);
-				
+				OpcodeTypeComponent opcodeTypeComponent = session.Network.Entity.GetComponent<OpcodeTypeComponent>();
+				object instance = opcodeTypeComponent.GetInstance(packet.Opcode);
+				message = session.Network.MessagePacker.DeserializeFrom(instance, packet.Stream);
 			}
 			catch (Exception e)
 			{
@@ -39,7 +39,7 @@ namespace ETHotfix
 					OneFrameMessage oneFrameMessage = new OneFrameMessage
 					{
 						Op = packet.Opcode,
-						AMessage = ByteString.CopyFrom(session.Network.MessagePacker.SerializeToByteArray(iFrameMessage))
+						AMessage = ByteString.CopyFrom(session.Network.MessagePacker.SerializeTo(iFrameMessage))
 					};
 					actorMessageSender.Send(oneFrameMessage);
 					return;

+ 7 - 0
Server/Model/Base/Helper/MongoHelper.cs

@@ -53,6 +53,13 @@ namespace ETModel
 			return obj.ToBson();
 		}
 		
+		public static void ToBson(object obj, MemoryStream stream)
+		{
+			byte[] bytes = obj.ToBson();
+			stream.Write(bytes);
+			stream.Seek(0, SeekOrigin.Begin);
+		}
+		
 		public static object FromBson(Type type, byte[] bytes)
 		{
 			return BsonSerializer.Deserialize(bytes, type);

+ 6 - 1
Server/Model/Module/Message/MongoPacker.cs

@@ -5,11 +5,16 @@ namespace ETModel
 {
 	public class MongoPacker: IMessagePacker
 	{
-		public byte[] SerializeToByteArray(object obj)
+		public byte[] SerializeTo(object obj)
 		{
 			return MongoHelper.ToBson(obj);
 		}
 
+		public void SerializeTo(object obj, MemoryStream stream)
+		{
+			MongoHelper.ToBson(obj, stream);
+		}
+
 		public object DeserializeFrom(Type type, byte[] bytes, int index, int count)
 		{
 			return MongoHelper.FromBson(type, bytes, index, count);

+ 1 - 0
Unity/Assets/Scripts/Module/Message/ErrorCode.cs

@@ -27,6 +27,7 @@ namespace ETModel
 		public const int ERR_ActorLocationNotFound = 202004;
 		public const int ERR_KcpConnectFail = 202005;
 		public const int ERR_KcpTimeout = 202006;
+		public const int ERR_KcpRemoteDisconnect = 202007;
 
 		public static bool IsRpcNeedThrowException(int error)
 		{

+ 2 - 1
Unity/Assets/Scripts/Module/Message/IMessagePacker.cs

@@ -5,7 +5,8 @@ namespace ETModel
 {
 	public interface IMessagePacker
 	{
-		byte[] SerializeToByteArray(object obj);
+		byte[] SerializeTo(object obj);
+		void SerializeTo(object obj, MemoryStream stream);
 		object DeserializeFrom(Type type, byte[] bytes, int index, int count);
 		object DeserializeFrom(object instance, byte[] bytes, int index, int count);
 		object DeserializeFrom(Type type, MemoryStream stream);

+ 7 - 1
Unity/Assets/Scripts/Module/Message/Network/AChannel.cs

@@ -1,5 +1,6 @@
 using System;
 using System.Collections.Generic;
+using System.IO;
 using System.Net;
 
 namespace ETModel
@@ -25,6 +26,10 @@ namespace ETModel
 
 		protected AService service;
 
+		public abstract MemoryStream Stream { get; }
+		
+		public int Error { get; set; }
+
 		public IPEndPoint RemoteAddress { get; protected set; }
 
 		private Action<AChannel, int> errorCallback;
@@ -62,6 +67,7 @@ namespace ETModel
 
 		protected void OnError(int e)
 		{
+			this.Error = e;
 			this.errorCallback?.Invoke(this, e);
 		}
 
@@ -79,7 +85,7 @@ namespace ETModel
 		/// </summary>
 		public abstract void Send(byte[] buffer, int index, int length);
 
-		public abstract void Send(List<byte[]> buffers);
+		public abstract void Send(MemoryStream stream);
 		
 		public override void Dispose()
 		{

+ 58 - 1
Unity/Assets/Scripts/Module/Message/Network/Circularbuffer.cs

@@ -114,7 +114,64 @@ namespace ETModel
 		    }
 		}
 
-		/// <summary>
+	    public void WriteTo(Stream stream, int count)
+	    {
+		    if (count > this.Length)
+		    {
+			    throw new Exception($"bufferList length < count, {Length} {count}");
+		    }
+
+		    int alreadyCopyCount = 0;
+		    while (alreadyCopyCount < count)
+		    {
+			    int n = count - alreadyCopyCount;
+			    if (ChunkSize - this.FirstIndex > n)
+			    {
+				    stream.Write(this.First, this.FirstIndex, n);
+				    this.FirstIndex += n;
+				    alreadyCopyCount += n;
+			    }
+			    else
+			    {
+				    stream.Write(this.First, this.FirstIndex, ChunkSize - this.FirstIndex);
+				    alreadyCopyCount += ChunkSize - this.FirstIndex;
+				    this.FirstIndex = 0;
+				    this.RemoveFirst();
+			    }
+		    }
+	    }
+	    
+	    public void ReadFrom(Stream stream)
+		{
+			int count = (int)(stream.Length - stream.Position);
+			
+			int alreadyCopyCount = 0;
+			while (alreadyCopyCount < count)
+			{
+				if (this.LastIndex == ChunkSize)
+				{
+					this.AddLast();
+					this.LastIndex = 0;
+				}
+
+				int n = count - alreadyCopyCount;
+				if (ChunkSize - this.LastIndex > n)
+				{
+					stream.Read(this.lastBuffer, this.LastIndex, n);
+					this.LastIndex += count - alreadyCopyCount;
+					alreadyCopyCount += n;
+				}
+				else
+				{
+					stream.Read(this.lastBuffer, this.LastIndex, ChunkSize - this.LastIndex);
+					alreadyCopyCount += ChunkSize - this.LastIndex;
+					this.LastIndex = ChunkSize;
+				}
+			}
+		}
+	    
+
+	    /// <summary>
 		/// 从stream流读到CircularBuffer中
 		/// </summary>
 		/// <param name="stream"></param>

+ 27 - 19
Unity/Assets/Scripts/Module/Message/Network/KCP/KChannel.cs

@@ -70,6 +70,23 @@ namespace ETModel
 			this.Connect(kService.TimeNow);
 		}
 
+		public override void Send(MemoryStream stream)
+		{
+			ushort size = (ushort)(stream.Length - stream.Position);
+			byte[] bytes;
+			if (this.isConnected)
+			{
+				bytes = stream.GetBuffer();
+			}
+			else
+			{
+				bytes = new byte[size];
+				Array.Copy(stream.GetBuffer(), stream.Position, bytes, 0, size);
+			}
+
+			Send(bytes, 0, size);
+		}
+
 		public override void Dispose()
 		{
 			if (this.IsDisposed)
@@ -92,6 +109,11 @@ namespace ETModel
 			return (KService)this.service;
 		}
 
+		public void HandleDisConnect()
+		{
+			this.OnError(ErrorCode.ERR_KcpRemoteDisconnect);
+		}
+
 		public void HandleConnnect(uint responseConn)
 		{
 			if (this.isConnected)
@@ -235,28 +257,14 @@ namespace ETModel
 			
 			this.sendBuffer.Enqueue(new WaitSendBuffer(buffer, index, length));
 		}
-
-		public override void Send(List<byte[]> buffers)
+		
+		public override MemoryStream Stream
 		{
-			ushort size = (ushort)buffers.Select(b => b.Length).Sum();
-			byte[] bytes;
-			if (this.isConnected)
+			get
 			{
-				bytes = this.packet.Bytes;
+				return this.packet.Stream;
 			}
-			else
-			{
-				bytes = new byte[size];
-			}
-
-			int index = 0;
-			foreach (byte[] buffer in buffers)
-			{
-				Array.Copy(buffer, 0, bytes, index, buffer.Length);
-				index += buffer.Length;
-			}
-
-			Send(bytes, 0, size);
 		}
+
 	}
 }

+ 1 - 3
Unity/Assets/Scripts/Module/Message/Network/KCP/KService.cs

@@ -185,9 +185,7 @@ namespace ETModel
 				return;
 			}
 
-			// 处理chanel
-			this.idChannels.Remove(requestConn);
-			kChannel.Dispose();
+			kChannel.HandleDisConnect();
 		}
 
 		private void HandleRecv(byte[] bytes, int length, uint conn)

+ 15 - 8
Unity/Assets/Scripts/Module/Message/Network/TCP/TChannel.cs

@@ -1,5 +1,6 @@
 using System;
 using System.Collections.Generic;
+using System.IO;
 using System.Linq;
 using System.Net;
 using System.Net.Sockets;
@@ -51,7 +52,7 @@ namespace ETModel
 			this.isConnected = true;
 			this.isSending = false;
 		}
-		
+
 		public override void Dispose()
 		{
 			if (this.IsDisposed)
@@ -69,6 +70,14 @@ namespace ETModel
 			this.socket = null;
 		}
 
+		public override MemoryStream Stream
+		{
+			get
+			{
+				return this.parser.packet.Stream;
+			}
+		}
+
 		public override void Start()
 		{
 			if (!this.isConnected)
@@ -96,20 +105,18 @@ namespace ETModel
 				this.StartSend();
 			}
 		}
-
-		public override void Send(List<byte[]> buffers)
+		
+		public override void Send(MemoryStream stream)
 		{
 			if (this.IsDisposed)
 			{
 				throw new Exception("TChannel已经被Dispose, 不能发送消息");
 			}
-			ushort size = (ushort)buffers.Select(b => b.Length).Sum();
+
+			ushort size = (ushort)(stream.Length - stream.Position);
 			byte[] sizeBuffer = BitConverter.GetBytes(size);
 			this.sendBuffer.Write(sizeBuffer, 0, sizeBuffer.Length);
-			foreach (byte[] buffer in buffers)
-			{
-				this.sendBuffer.Write(buffer, 0, buffer.Length);
-			}
+			this.sendBuffer.ReadFrom(stream);
 
 			if(!this.isSending)
 			{

+ 5 - 0
Unity/Assets/Scripts/Module/Message/ProtobufHelper.cs

@@ -12,6 +12,11 @@ namespace ETModel
 			return ((Google.Protobuf.IMessage) message).ToByteArray();
 		}
 		
+		public static void ToStream(object message, MemoryStream stream)
+		{
+			((Google.Protobuf.IMessage) message).WriteTo(stream);
+		}
+		
 		public static object FromBytes(Type type, byte[] bytes, int index, int count)
 		{
 			object message = Activator.CreateInstance(type);

+ 6 - 1
Unity/Assets/Scripts/Module/Message/ProtobufPacker.cs

@@ -5,11 +5,16 @@ namespace ETModel
 {
 	public class ProtobufPacker : IMessagePacker
 	{
-		public byte[] SerializeToByteArray(object obj)
+		public byte[] SerializeTo(object obj)
 		{
 			return ProtobufHelper.ToBytes(obj);
 		}
 
+		public void SerializeTo(object obj, MemoryStream stream)
+		{
+			ProtobufHelper.ToStream(obj, stream);
+		}
+
 		public object DeserializeFrom(Type type, byte[] bytes, int index, int count)
 		{
 			return ProtobufHelper.FromBytes(type, bytes, index, count);

+ 53 - 16
Unity/Assets/Scripts/Module/Message/Session.cs

@@ -22,10 +22,9 @@ namespace ETModel
 	{
 		private static int RpcId { get; set; }
 		private AChannel channel;
-		public int Error;
 
 		private readonly Dictionary<int, Action<IResponse>> requestCallback = new Dictionary<int, Action<IResponse>>();
-		private readonly List<byte[]> byteses = new List<byte[]>() { new byte[1], new byte[0], new byte[0]};
+		private readonly List<byte[]> byteses = new List<byte[]>() { new byte[1], new byte[0] };
 
 		public NetworkComponent Network
 		{
@@ -35,15 +34,25 @@ namespace ETModel
 			}
 		}
 
+		public int Error
+		{
+			get
+			{
+				return this.channel.Error;
+			}
+			set
+			{
+				this.channel.Error = value;
+			}
+		}
+
 		public void Awake(AChannel aChannel)
 		{
-			this.Error = 0;
 			this.channel = aChannel;
 			this.requestCallback.Clear();
 			long id = this.Id;
 			channel.ErrorCallback += (c, e) =>
 			{
-				this.Error = e;
 				this.Network.Remove(id); 
 			};
 			channel.ReadCallback += this.OnRead;
@@ -66,12 +75,12 @@ namespace ETModel
 				action.Invoke(new ResponseMessage { Error = this.Error });
 			}
 
-			if (this.Error != 0)
+			int error = this.channel.Error;
+			if (this.channel.Error != 0)
 			{
-				Log.Error($"session dispose: {this.Id} {this.Error}");
+				Log.Error($"session dispose: {this.Id} {error}");
 			}
-
-			this.Error = 0;
+			
 			this.channel.Dispose();
 			this.Network.Remove(id);
 			this.requestCallback.Clear();
@@ -93,6 +102,14 @@ namespace ETModel
 			}
 		}
 
+		public MemoryStream Stream
+		{
+			get
+			{
+				return this.channel.Stream;
+			}
+		}
+
 		public void OnRead(Packet packet)
 		{
 			try
@@ -232,11 +249,11 @@ namespace ETModel
 		{
 			OpcodeTypeComponent opcodeTypeComponent = this.Network.Entity.GetComponent<OpcodeTypeComponent>();
 			ushort opcode = opcodeTypeComponent.GetOpcode(message.GetType());
-			byte[] bytes = this.Network.MessagePacker.SerializeToByteArray(message);
-			Send(flag, opcode, bytes);
+			
+			Send(flag, opcode, message);
 		}
-
-		public void Send(byte flag, ushort opcode, byte[] bytes)
+		
+		public void Send(byte flag, ushort opcode, object message)
 		{
 			if (this.IsDisposed)
 			{
@@ -244,7 +261,22 @@ namespace ETModel
 			}
 			this.byteses[0][0] = flag;
 			this.byteses[1] = BitConverter.GetBytes(opcode);
-			this.byteses[2] = bytes;
+
+			MemoryStream stream = this.Stream;
+			
+			int index = Packet.Index;
+			stream.Seek(index, SeekOrigin.Begin);
+			stream.SetLength(index);
+			var  bb = this.Network.MessagePacker.SerializeTo(message);
+			this.Network.MessagePacker.SerializeTo(message, stream);
+			
+			stream.Seek(0, SeekOrigin.Begin);
+			index = 0;
+			foreach (var bytes in this.byteses)
+			{
+				Array.Copy(bytes, 0, stream.GetBuffer(), index, bytes.Length);
+				index += bytes.Length;
+			}
 
 #if SERVER
 			// 如果是allserver,内部消息不走网络,直接转给session,方便调试时看到整体堆栈
@@ -257,14 +289,19 @@ namespace ETModel
 				packet.Flag = flag;
 				packet.Opcode = opcode;
 				packet.Stream.Seek(0, SeekOrigin.Begin);
-				packet.Stream.SetLength(bytes.Length);
-				Array.Copy(bytes, 0, packet.Bytes, 0, bytes.Length);
+				packet.Stream.SetLength(0);
+				this.Network.MessagePacker.SerializeTo(message, stream);
 				session.Run(packet);
 				return;
 			}
 #endif
 
-			channel.Send(this.byteses);
+			this.Send(stream);
+		}
+
+		public void Send(MemoryStream stream)
+		{
+			channel.Send(stream);
 		}
 	}
 }

+ 0 - 1
Unity/Assets/ThirdParty/ILRuntime/Generated/CLRBindings.cs

@@ -27,7 +27,6 @@ namespace ILRuntime.Runtime.Generated
             ETModel_SessionComponent_Binding.Register(app);
             ETModel_Frame_ClickMap_Binding.Register(app);
             UnityEngine_Vector3_Binding.Register(app);
-            ETModel_Session_Binding.Register(app);
             System_Runtime_CompilerServices_AsyncVoidMethodBuilder_Binding.Register(app);
             System_Threading_Tasks_Task_1_ILTypeInstance_Binding.Register(app);
             System_Runtime_CompilerServices_TaskAwaiter_1_ILTypeInstance_Binding.Register(app);

+ 0 - 126
Unity/Assets/ThirdParty/ILRuntime/Generated/ETModel_Session_Binding.cs

@@ -15,132 +15,6 @@ namespace ILRuntime.Runtime.Generated
 {
     unsafe class ETModel_Session_Binding
     {
-        public static void Register(ILRuntime.Runtime.Enviorment.AppDomain app)
-        {
-            BindingFlags flag = BindingFlags.Public | BindingFlags.Instance | BindingFlags.Static | BindingFlags.DeclaredOnly;
-            MethodBase method;
-            FieldInfo field;
-            Type[] args;
-            Type type = typeof(ETModel.Session);
-            args = new Type[]{typeof(ETModel.IMessage)};
-            method = type.GetMethod("Send", flag, null, args, null);
-            app.RegisterCLRMethodRedirection(method, Send_0);
-            args = new Type[]{};
-            method = type.GetMethod("get_Network", flag, null, args, null);
-            app.RegisterCLRMethodRedirection(method, get_Network_1);
-            args = new Type[]{typeof(System.Byte), typeof(System.UInt16), typeof(System.Byte[])};
-            method = type.GetMethod("Send", flag, null, args, null);
-            app.RegisterCLRMethodRedirection(method, Send_2);
-            args = new Type[]{typeof(ETModel.IRequest)};
-            method = type.GetMethod("Call", flag, null, args, null);
-            app.RegisterCLRMethodRedirection(method, Call_3);
-
-            field = type.GetField("Error", flag);
-            app.RegisterCLRFieldGetter(field, get_Error_0);
-            app.RegisterCLRFieldSetter(field, set_Error_0);
-
-
-        }
-
-
-        static StackObject* Send_0(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
-        {
-            ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
-            StackObject* ptr_of_this_method;
-            StackObject* __ret = ILIntepreter.Minus(__esp, 2);
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
-            ETModel.IMessage @message = (ETModel.IMessage)typeof(ETModel.IMessage).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
-            __intp.Free(ptr_of_this_method);
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 2);
-            ETModel.Session instance_of_this_method = (ETModel.Session)typeof(ETModel.Session).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
-            __intp.Free(ptr_of_this_method);
-
-            instance_of_this_method.Send(@message);
-
-            return __ret;
-        }
-
-        static StackObject* get_Network_1(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
-        {
-            ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
-            StackObject* ptr_of_this_method;
-            StackObject* __ret = ILIntepreter.Minus(__esp, 1);
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
-            ETModel.Session instance_of_this_method = (ETModel.Session)typeof(ETModel.Session).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
-            __intp.Free(ptr_of_this_method);
-
-            var result_of_this_method = instance_of_this_method.Network;
-
-            object obj_result_of_this_method = result_of_this_method;
-            if(obj_result_of_this_method is CrossBindingAdaptorType)
-            {    
-                return ILIntepreter.PushObject(__ret, __mStack, ((CrossBindingAdaptorType)obj_result_of_this_method).ILInstance);
-            }
-            return ILIntepreter.PushObject(__ret, __mStack, result_of_this_method);
-        }
-
-        static StackObject* Send_2(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
-        {
-            ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
-            StackObject* ptr_of_this_method;
-            StackObject* __ret = ILIntepreter.Minus(__esp, 4);
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
-            System.Byte[] @bytes = (System.Byte[])typeof(System.Byte[]).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
-            __intp.Free(ptr_of_this_method);
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 2);
-            System.UInt16 @opcode = (ushort)ptr_of_this_method->Value;
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 3);
-            System.Byte @flag = (byte)ptr_of_this_method->Value;
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 4);
-            ETModel.Session instance_of_this_method = (ETModel.Session)typeof(ETModel.Session).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
-            __intp.Free(ptr_of_this_method);
-
-            instance_of_this_method.Send(@flag, @opcode, @bytes);
-
-            return __ret;
-        }
-
-        static StackObject* Call_3(ILIntepreter __intp, StackObject* __esp, IList<object> __mStack, CLRMethod __method, bool isNewObj)
-        {
-            ILRuntime.Runtime.Enviorment.AppDomain __domain = __intp.AppDomain;
-            StackObject* ptr_of_this_method;
-            StackObject* __ret = ILIntepreter.Minus(__esp, 2);
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 1);
-            ETModel.IRequest @request = (ETModel.IRequest)typeof(ETModel.IRequest).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
-            __intp.Free(ptr_of_this_method);
-
-            ptr_of_this_method = ILIntepreter.Minus(__esp, 2);
-            ETModel.Session instance_of_this_method = (ETModel.Session)typeof(ETModel.Session).CheckCLRTypes(StackObject.ToObject(ptr_of_this_method, __domain, __mStack));
-            __intp.Free(ptr_of_this_method);
-
-            var result_of_this_method = instance_of_this_method.Call(@request);
-
-            object obj_result_of_this_method = result_of_this_method;
-            if(obj_result_of_this_method is CrossBindingAdaptorType)
-            {    
-                return ILIntepreter.PushObject(__ret, __mStack, ((CrossBindingAdaptorType)obj_result_of_this_method).ILInstance);
-            }
-            return ILIntepreter.PushObject(__ret, __mStack, result_of_this_method);
-        }
-
-
-        static object get_Error_0(ref object o)
-        {
-            return ((ETModel.Session)o).Error;
-        }
-        static void set_Error_0(ref object o, object v)
-        {
-            ((ETModel.Session)o).Error = (System.Int32)v;
-        }
-
 
     }
 }

+ 3 - 4
Unity/Hotfix/Module/Message/Session.cs

@@ -87,13 +87,12 @@ namespace ETHotfix
 		public void Send(byte flag, IMessage message)
 		{
 			ushort opcode = Game.Scene.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
-			byte[] bytes = ProtobufHelper.ToBytes(message);
-			session.Send(flag, opcode, bytes);
+			this.Send(flag, opcode, message);
 		}
 
-		public void Send(byte flag, ushort opcode, byte[] bytes)
+		public void Send(byte flag, ushort opcode, IMessage message)
 		{
-			session.Send(flag, opcode, bytes);
+			session.Send(flag, opcode, message);
 		}
 
 		public Task<IResponse> Call(IRequest request)