Sfoglia il codice sorgente

1.对象池限制一种对象最多1000个
2.benchmark的时候连接数太多,连接时间超过10秒,这里调成20秒超时
3.修复Vector3中的Angle方法错误
4.修复KService中waitAcceptChannels超时没有删除的问题
5.TChannel跨线程使用一个struct,减少gc
6.修复网络多线程bug

tanghai 3 anni fa
parent
commit
b705e20ceb
27 ha cambiato i file con 287 aggiunte e 214 eliminazioni
  1. 7 7
      Config/StartConfig/RouterTest/StartProcessConfigCategory.bytes
  2. 1 1
      DotNet/ThirdParty/UnityEngine/Vector3.cs
  3. 3 3
      Unity/Assets/Bundles/Config/UnitConfigCategory.bytes
  4. 7 7
      Unity/Assets/Config/Excel/Json/s/StartConfig/RouterTest/StartProcessConfig.txt
  5. BIN
      Unity/Assets/Config/Excel/StartConfig/RouterTest/StartProcessConfig@s.xlsx
  6. 1 1
      Unity/Assets/Scripts/Codes/Hotfix/Client/Demo/Router/RouterHelper.cs
  7. 1 2
      Unity/Assets/Scripts/Codes/Hotfix/Client/Demo/Session/NetClientComponentOnReadEvent.cs
  8. 1 1
      Unity/Assets/Scripts/Codes/Hotfix/Server/Demo/Scenes/Benchmark/BenchmarkClientComponentSystem.cs
  9. 0 2
      Unity/Assets/Scripts/Codes/Hotfix/Server/Demo/Session/NetServerComponentOnReadEvent.cs
  10. 1 12
      Unity/Assets/Scripts/Codes/Hotfix/Server/Module/Actor/ActorHelper.cs
  11. 2 2
      Unity/Assets/Scripts/Codes/Hotfix/Server/Module/Router/RouterComponentSystem.cs
  12. 1 0
      Unity/Assets/Scripts/Codes/Model/Share/Module/Message/OpcodeHelper.cs
  13. 1 1
      Unity/Assets/Scripts/Codes/Model/Share/Module/Message/Session.cs
  14. 1 1
      Unity/Assets/Scripts/Core/Module/Network/AService.cs
  15. 2 0
      Unity/Assets/Scripts/Core/Module/Network/ErrorCore.cs
  16. 20 49
      Unity/Assets/Scripts/Core/Module/Network/KChannel.cs
  17. 114 18
      Unity/Assets/Scripts/Core/Module/Network/KService.cs
  18. 4 17
      Unity/Assets/Scripts/Core/Module/Network/MessageSerializeHelper.cs
  19. 3 6
      Unity/Assets/Scripts/Core/Module/Network/NetServices.cs
  20. 0 12
      Unity/Assets/Scripts/Core/Module/Network/OpcodeRangeDefine.cs
  21. 20 47
      Unity/Assets/Scripts/Core/Module/Network/TChannel.cs
  22. 85 19
      Unity/Assets/Scripts/Core/Module/Network/TService.cs
  23. 2 2
      Unity/Assets/Scripts/Core/Module/Network/WChannel.cs
  24. 3 1
      Unity/Assets/Scripts/Core/Module/Network/WService.cs
  25. 6 0
      Unity/Assets/Scripts/Core/Module/ObjectPool/ObjectPool.cs
  26. 1 1
      Unity/Assets/Scripts/Core/Module/Timer/TimerComponent.cs
  27. 0 2
      Unity/Assets/Scripts/Core/TimerCoreCallbackId.cs

+ 7 - 7
Config/StartConfig/RouterTest/StartProcessConfigCategory.bytes

@@ -1,8 +1,8 @@
 
-
-
-
-
-
-
-
+
+
+
+
+
+
+

+ 1 - 1
DotNet/ThirdParty/UnityEngine/Vector3.cs

@@ -558,7 +558,7 @@ namespace UnityEngine
             to.Normalize();
             float result;
             Vector3.Dot(ref from, ref to, out result);
-            return Mathf.Cos(Mathf.Clamp(result, -1f, 1f)) * 57.29578f;
+            return Mathf.Acos(Mathf.Clamp(result, -1f, 1f)) * 57.29578f;
         }
 
         public static void Angle(ref Vector3 from, ref Vector3 to, out float result)

+ 3 - 3
Unity/Assets/Bundles/Config/UnitConfigCategory.bytes

@@ -1,4 +1,4 @@
 
-1é	米克尔"带有强力攻击技能(0²8D
-3ê
-米克尔2"带有强力攻击技能2(0–8N
+/é	米克尔"带有强力攻击技能(0²
+1ê
+米克尔2"带有强力攻击技能2(0–

+ 7 - 7
Unity/Assets/Config/Excel/Json/s/StartConfig/RouterTest/StartProcessConfig.txt

@@ -1,9 +1,9 @@
 {"list":[
-{"_t":"StartProcessConfig","_id":1,"MachineId":1,"InnerPort":20001},
-{"_t":"StartProcessConfig","_id":2,"MachineId":1,"InnerPort":20002},
-{"_t":"StartProcessConfig","_id":3,"MachineId":1,"InnerPort":20003},
-{"_t":"StartProcessConfig","_id":4,"MachineId":1,"InnerPort":20004},
-{"_t":"StartProcessConfig","_id":5,"MachineId":1,"InnerPort":20005},
-{"_t":"StartProcessConfig","_id":6,"MachineId":1,"InnerPort":20006},
-{"_t":"StartProcessConfig","_id":7,"MachineId":1,"InnerPort":20007},
+{"_t":"StartProcessConfig","_id":1,"MachineId":1,"InnerPort":20101},
+{"_t":"StartProcessConfig","_id":2,"MachineId":1,"InnerPort":20102},
+{"_t":"StartProcessConfig","_id":3,"MachineId":1,"InnerPort":20103},
+{"_t":"StartProcessConfig","_id":4,"MachineId":1,"InnerPort":20104},
+{"_t":"StartProcessConfig","_id":5,"MachineId":1,"InnerPort":20105},
+{"_t":"StartProcessConfig","_id":6,"MachineId":1,"InnerPort":20106},
+{"_t":"StartProcessConfig","_id":7,"MachineId":1,"InnerPort":20107},
 ]}

BIN
Unity/Assets/Config/Excel/StartConfig/RouterTest/StartProcessConfig@s.xlsx


+ 1 - 1
Unity/Assets/Scripts/Codes/Hotfix/Client/Demo/Router/RouterHelper.cs

@@ -45,7 +45,7 @@ namespace ET.Client
             
             using Socket socket = new Socket(routerAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
             
-            int count = 10;
+            int count = 20;
             byte[] sendCache = new byte[512];
             byte[] recvCache = new byte[512];
 

+ 1 - 2
Unity/Assets/Scripts/Codes/Hotfix/Client/Demo/Session/NetClientComponentOnReadEvent.cs

@@ -12,8 +12,7 @@
                 session.OnResponse(response);
                 return;
             }
-
-            OpcodeHelper.LogMsg(session.DomainZone(), message);
+            
             // 普通消息或者是Rpc请求消息
             MessageDispatcherComponent.Instance.Handle(session, message);
             await ETTask.CompletedTask;

+ 1 - 1
Unity/Assets/Scripts/Codes/Hotfix/Server/Demo/Scenes/Benchmark/BenchmarkClientComponentSystem.cs

@@ -27,7 +27,7 @@ namespace ET.Server
             NetClientComponent netClientComponent = scene.AddComponent<NetClientComponent, AddressFamily>(AddressFamily.InterNetwork);
 
             using Session session = netClientComponent.Create(StartSceneConfigCategory.Instance.BenchmarkServer.OuterIPPort);
-            List<ETTask<IResponse>> list = new List<ETTask<IResponse>>(10000);
+            List<ETTask<IResponse>> list = new List<ETTask<IResponse>>(100000);
             for (int j = 0; j < 100000000; ++j)
             {
                 list.Clear();

+ 0 - 2
Unity/Assets/Scripts/Codes/Hotfix/Server/Demo/Session/NetServerComponentOnReadEvent.cs

@@ -13,8 +13,6 @@
                 session.OnResponse(response);
                 return;
             }
-
-            OpcodeHelper.LogMsg(session.DomainZone(), message);
 			
             // 根据消息接口判断是不是Actor消息,不同的接口做不同的处理
             switch (message)

+ 1 - 12
Unity/Assets/Scripts/Codes/Hotfix/Server/Module/Actor/ActorHelper.cs

@@ -18,18 +18,7 @@ namespace ET.Server
         {
             ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), 8);
             Type type = NetServices.Instance.GetType(opcode);
-
-            if (opcode < OpcodeRangeDefine.PbMaxOpcode)
-            {
-                return ProtobufHelper.FromBytes(type, memoryStream.GetBuffer(), 10, (int)memoryStream.Length - 10);
-            }
-
-            if (opcode >= OpcodeRangeDefine.JsonMinOpcode)
-            {
-                return JsonHelper.FromJson(type, memoryStream.GetBuffer().ToStr(10, (int)(memoryStream.Length - 10)));
-            }
-
-            return MongoHelper.FromBson(type, memoryStream.GetBuffer(), 10, (int)memoryStream.Length - 10);
+            return ProtobufHelper.FromBytes(type, memoryStream.GetBuffer(), 10, (int)memoryStream.Length - 10);
         }
     }
 }

+ 2 - 2
Unity/Assets/Scripts/Codes/Hotfix/Server/Module/Router/RouterComponentSystem.cs

@@ -10,7 +10,7 @@ namespace ET.Server
     public static class RouterComponentSystem
     {
         [ObjectSystem]
-        public class RouterComponentAwakeSystem: AwakeSystem<RouterComponent, IPEndPoint, string>
+        public class RandomGeneratorponentAwakeSystem: AwakeSystem<RouterComponent, IPEndPoint, string>
         {
             protected override void Awake(RouterComponent self, IPEndPoint outerAddress, string innerIP)
             {
@@ -317,7 +317,7 @@ namespace ET.Server
                         break;
                     }
 
-                    if (++kcpRouter.SyncCount > 10)
+                    if (++kcpRouter.SyncCount > 20)
                     {
                         self.OnError(kcpRouter.Id, ErrorCore.ERR_KcpRouterSyncCountTooMuchTimes);
                         break;

+ 1 - 0
Unity/Assets/Scripts/Codes/Model/Share/Module/Message/OpcodeHelper.cs

@@ -11,6 +11,7 @@ namespace ET
             OuterMessage.G2C_Ping,
             OuterMessage.C2G_Benchmark,
             OuterMessage.G2C_Benchmark,
+            ushort.MaxValue, // ActorResponse
         };
 
         private static bool IsNeedLogMessage(ushort opcode)

+ 1 - 1
Unity/Assets/Scripts/Codes/Model/Share/Module/Message/Session.cs

@@ -42,7 +42,7 @@ namespace ET
         {
             protected override void Destroy(Session self)
             {
-                NetServices.Instance.RemoveChannel(self.ServiceId, self.Id);
+                NetServices.Instance.RemoveChannel(self.ServiceId, self.Id, self.Error);
             
                 foreach (RpcInfo responseCallback in self.requestCallbacks.Values.ToArray())
                 {

+ 1 - 1
Unity/Assets/Scripts/Core/Module/Network/AService.cs

@@ -16,7 +16,7 @@ namespace ET
 
         public abstract void Update();
 
-        public abstract void Remove(long id);
+        public abstract void Remove(long id, int error = 0);
         
         public abstract bool IsDispose();
 

+ 2 - 0
Unity/Assets/Scripts/Core/Module/Network/ErrorCore.cs

@@ -1,10 +1,12 @@
 namespace ET
 {
+    [UniqueId(100000, 500000)]
     public static class ErrorCore
     {
         public const int ERR_MyErrorCode = 110000;
         
         public const int ERR_KcpConnectTimeout = 100205;
+        public const int ERR_KcpAcceptTimeout = 100206;
         public const int ERR_PeerDisconnect = 100208;
         public const int ERR_SocketCantSend = 100209;
         public const int ERR_SocketError = 100210;

+ 20 - 49
Unity/Assets/Scripts/Core/Module/Network/KChannel.cs

@@ -17,33 +17,13 @@ namespace ET
 	
 	public class KChannel : AChannel
 	{
-		[Callback(TimerCoreCallbackId.KServiceConnectTimer)]
-		public class KServiceConnectTimer: ATimer<KChannel>
-		{
-			protected override void Run(KChannel kChannel)
-			{
-				kChannel.Connect();
-			}
-		}
-		
-		[Callback(TimerCoreCallbackId.KServiceNextUpdateTimer)]
-		public class KServiceNextUpdateTimer: ATimer<KChannel>
-		{
-			protected override void Run(KChannel kChannel)
-			{
-				kChannel.Service.AddToUpdate(kChannel.Id);
-			}
-		}
-
-		private readonly KService Service;
+		public readonly KService Service;
 		
 		private Socket socket;
 
 		public IntPtr kcp { get; private set; }
 
 		private readonly Queue<KcpWaitPacket> sendBuffer = new Queue<KcpWaitPacket>();
-
-		private uint lastRecvTime;
 		
 		public readonly uint CreateTime;
 
@@ -68,7 +48,7 @@ namespace ET
 		
 		private const int maxPacketSize = 10000;
 
-		private MemoryStream ms = new MemoryStream(maxPacketSize);
+		private readonly MemoryStream ms = new MemoryStream(maxPacketSize);
 
 		private MemoryStream readMemory;
 		private int needReadSplitCount;
@@ -107,10 +87,9 @@ namespace ET
 			this.Service = kService;
 			this.RemoteAddress = remoteEndPoint;
 			this.socket = socket;
-			this.lastRecvTime = kService.TimeNow;
 			this.CreateTime = kService.TimeNow;
 
-			this.Connect();
+			this.Connect(this.CreateTime);
 
 		}
 
@@ -129,7 +108,6 @@ namespace ET
 			this.kcp = Kcp.KcpCreate(this.RemoteConn, new IntPtr(this.Service.Id));
 			this.InitKcp();
 			
-			this.lastRecvTime = kService.TimeNow;
 			this.CreateTime = kService.TimeNow;
 		}
 	
@@ -143,7 +121,7 @@ namespace ET
 
 			uint localConn = this.LocalConn;
 			uint remoteConn = this.RemoteConn;
-			Log.Info($"channel dispose: {localConn} {remoteConn}");
+			Log.Info($"channel dispose: {localConn} {remoteConn} {this.Error}");
 			
 			long id = this.Id;
 			this.Id = 0;
@@ -151,9 +129,11 @@ namespace ET
 
 			try
 			{
-				//this.Service.Disconnect(localConn, remoteConn, this.Error, this.RemoteAddress, 3);
+				if (this.Error != ErrorCore.ERR_PeerDisconnect)
+				{
+					this.Service.Disconnect(localConn, remoteConn, this.Error, this.RemoteAddress, 3);
+				}
 			}
-
 			catch (Exception e)
 			{
 				Log.Error(e);
@@ -181,7 +161,6 @@ namespace ET
 
 			Log.Info($"channel connected: {this.LocalConn} {this.RemoteConn} {this.RemoteAddress}");
 			this.IsConnected = true;
-			this.lastRecvTime = this.Service.TimeNow;
 			
 			while (true)
 			{
@@ -198,7 +177,7 @@ namespace ET
 		/// <summary>
 		/// 发送请求连接消息
 		/// </summary>
-		private void Connect()
+		private void Connect(uint timeNow)
 		{
 			try
 			{
@@ -207,18 +186,14 @@ namespace ET
 					return;
 				}
 				
-				uint timeNow = this.Service.TimeNow;
-				
-				// 5秒连接超时
-				if (timeNow > this.CreateTime + 5 * 1000)
+				// 10秒连接超时
+				if (timeNow > this.CreateTime + KService.ConnectTimeoutTime)
 				{
 					Log.Error($"kChannel connect timeout: {this.Id} {this.RemoteConn} {timeNow} {this.CreateTime} {this.ChannelType} {this.RemoteAddress}");
 					this.OnError(ErrorCore.ERR_KcpConnectTimeout);
 					return;
 				}
 				
-				this.lastRecvTime = timeNow;
-				
 				byte[] buffer = sendCache;
 				buffer.WriteTo(0, KcpProtocalType.SYN);
 				buffer.WriteTo(1, this.LocalConn);
@@ -227,8 +202,7 @@ namespace ET
 				Log.Info($"kchannel connect {this.LocalConn} {this.RemoteConn} {this.RealAddress} {this.socket.LocalEndPoint}");
 				
 				// 300毫秒后再次update发送connect请求
-				long tillTime = TimeHelper.ClientNow() + 300;
-				NetServices.Instance.TimerComponent.NewOnceTimer(tillTime, TimerCoreCallbackId.KServiceConnectTimer, this);
+				this.Service.AddToUpdate(timeNow + 300, this.Id);
 			}
 			catch (Exception e)
 			{
@@ -237,18 +211,17 @@ namespace ET
 			}
 		}
 
-		public void Update()
+		public void Update(uint timeNow)
 		{
 			if (this.IsDisposed)
 			{
 				return;
 			}
-
-			uint timeNow = this.Service.TimeNow;
 			
 			// 如果还没连接上,发送连接请求
 			if (!this.IsConnected && this.ChannelType == ChannelType.Connect)
 			{
+				this.Connect(timeNow);
 				return;
 			}
 
@@ -269,8 +242,7 @@ namespace ET
 			}
 
 			uint nextUpdateTime = Kcp.KcpCheck(this.kcp, timeNow);
-			long tillTime = nextUpdateTime - timeNow + TimeHelper.ClientNow();
-			NetServices.Instance.TimerComponent.NewOnceTimer(tillTime, TimerCoreCallbackId.KServiceNextUpdateTimer, this);
+			this.Service.AddToUpdate(nextUpdateTime, this.Id);
 		}
 
 		public void HandleRecv(byte[] date, int offset, int length)
@@ -281,7 +253,7 @@ namespace ET
 			}
 
 			Kcp.KcpInput(this.kcp, date, offset, length);
-			this.Service.AddToUpdate(this.Id);
+			this.Service.AddToUpdate(0, this.Id);
 
 			while (true)
 			{
@@ -369,7 +341,6 @@ namespace ET
 						this.readMemory.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
 						break;
 				}
-				this.lastRecvTime = this.Service.TimeNow;
 				MemoryStream mem = this.readMemory;
 				this.readMemory = null;
 				this.OnRead(mem);
@@ -460,7 +431,7 @@ namespace ET
 				}
 			}
 
-			this.Service.AddToUpdate(this.Id);
+			this.Service.AddToUpdate(0, this.Id);
 		}
 		
 		public void Send(long actorId, MemoryStream stream)
@@ -514,7 +485,7 @@ namespace ET
 					{
 						ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
 						Type type = NetServices.Instance.GetType(opcode);
-						message = MessageSerializeHelper.DeserializeFrom(opcode, type, memoryStream);
+						message = MessageSerializeHelper.DeserializeFrom(type, memoryStream);
 						break;
 					}
 					case ServiceType.Inner:
@@ -522,7 +493,7 @@ namespace ET
 						actorId = BitConverter.ToInt64(memoryStream.GetBuffer(), Packet.ActorIdIndex);
 						ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.OpcodeIndex);
 						Type type = NetServices.Instance.GetType(opcode);
-						message = MessageSerializeHelper.DeserializeFrom(opcode, type, memoryStream);
+						message = MessageSerializeHelper.DeserializeFrom(type, memoryStream);
 						break;
 					}
 				}
@@ -538,7 +509,7 @@ namespace ET
 		public void OnError(int error)
 		{
 			long channelId = this.Id;
-			this.Service.Remove(channelId);
+			this.Service.Remove(channelId, error);
 			NetServices.Instance.OnError(this.Service.Id, channelId, error);
 		}
 	}

+ 114 - 18
Unity/Assets/Scripts/Core/Module/Network/KService.cs

@@ -28,6 +28,8 @@ namespace ET
 
     public sealed class KService: AService
     {
+        public const int ConnectTimeoutTime = 20 * 1000;
+
         public readonly Dictionary<IntPtr, KChannel> KcpPtrChannels = new Dictionary<IntPtr, KChannel>();
         
         // KService创建的时间
@@ -151,14 +153,20 @@ namespace ET
 
         // 保存所有的channel
         private readonly Dictionary<long, KChannel> localConnChannels = new Dictionary<long, KChannel>();
-        private readonly Dictionary<long, KChannel> waitConnectChannels = new Dictionary<long, KChannel>();
+        private readonly Dictionary<long, KChannel> waitAcceptChannels = new Dictionary<long, KChannel>();
 
-        private readonly byte[] cache = new byte[8192];
+        private readonly byte[] cache = new byte[2048];
         private EndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, 0);
 
         // 下帧要更新的channel
         private readonly HashSet<long> updateIds = new HashSet<long>();
         
+        // 下次时间更新的channel
+        private readonly MultiMap<long, long> timeId = new MultiMap<long, long>();
+        private readonly List<long> timeOutTime = new List<long>();
+        // 记录最小时间,不用每次都去MultiMap取第一个值
+        private long minTime;
+        
         public override bool IsDispose()
         {
             return this.socket == null;
@@ -286,11 +294,11 @@ namespace ET
                             remoteConn = BitConverter.ToUInt32(this.cache, 1);
                             localConn = BitConverter.ToUInt32(this.cache, 5);
 
-                            this.waitConnectChannels.TryGetValue(remoteConn, out kChannel);
+                            this.waitAcceptChannels.TryGetValue(remoteConn, out kChannel);
                             if (kChannel == null)
                             {
                                 // accept的localConn不能与connect的localConn冲突,所以设置为一个大的数
-                                // localConn被人猜出来问题不大,因为remoteConn是随机的第三方并不知道
+                                // localConn被人猜出来问题不大,因为remoteConn是随机的,第三方并不知道
                                 localConn = NetServices.Instance.CreateAcceptChannelId();
                                 // 已存在同样的localConn,则不处理,等待下次sync
                                 if (this.localConnChannels.ContainsKey(localConn))
@@ -299,9 +307,9 @@ namespace ET
                                 }
 
                                 kChannel = new KChannel(localConn, remoteConn, this.socket, this.CloneAddress(), this);
-                                this.waitConnectChannels.Add(kChannel.RemoteConn, kChannel); // 连接上了或者超时后会删除
+                                this.waitAcceptChannels.Add(kChannel.RemoteConn, kChannel); // 连接上了或者超时后会删除
                                 this.localConnChannels.Add(kChannel.LocalConn, kChannel);
-
+                                
                                 kChannel.RealAddress = realAddress;
 
                                 IPEndPoint realEndPoint = kChannel.RealAddress == null? kChannel.RemoteAddress : NetworkHelper.ToIPEndPoint(kChannel.RealAddress);
@@ -410,8 +418,9 @@ namespace ET
                             if (!kChannel.IsConnected)
                             {
                                 kChannel.IsConnected = true;
-                                this.waitConnectChannels.Remove(remoteConn);
+                                this.waitAcceptChannels.Remove(kChannel.RemoteConn);
                             }
+
                             kChannel.HandleRecv(this.cache, 5, messageLength - 5);
                             break;
                     }
@@ -450,20 +459,23 @@ namespace ET
             }
         }
 
-        public override void Remove(long id)
+        public override void Remove(long id, int error = 0)
         {
             if (!this.localConnChannels.TryGetValue(id, out KChannel kChannel))
             {
                 return;
             }
-            Log.Info($"kservice remove channel: {id} {kChannel.LocalConn} {kChannel.RemoteConn}");
+
+            kChannel.Error = error;
+            
+            Log.Info($"kservice remove channel: {id} {kChannel.LocalConn} {kChannel.RemoteConn} {error}");
             this.localConnChannels.Remove(id);
             this.localConnChannels.Remove(kChannel.LocalConn);
-            if (this.waitConnectChannels.TryGetValue(kChannel.RemoteConn, out KChannel waitChannel))
+            if (this.waitAcceptChannels.TryGetValue(kChannel.RemoteConn, out KChannel waitChannel))
             {
                 if (waitChannel.LocalConn == kChannel.LocalConn)
                 {
-                    this.waitConnectChannels.Remove(kChannel.RemoteConn);
+                    this.waitAcceptChannels.Remove(kChannel.RemoteConn);
                 }
             }
 
@@ -472,7 +484,7 @@ namespace ET
             kChannel.Dispose();
         }
 
-        private void Disconnect(uint localConn, uint remoteConn, int error, IPEndPoint address, int times)
+        public void Disconnect(uint localConn, uint remoteConn, int error, IPEndPoint address, int times)
         {
             try
             {
@@ -511,12 +523,49 @@ namespace ET
 
         public override void Update()
         {
-            this.Recv();
+            uint timeNow = this.TimeNow;
+            
+            this.TimerOut(timeNow);
             
-            this.UpdateChannel();
+            this.CheckWaitAcceptChannel(timeNow);
+            
+            this.Recv();
+
+            this.UpdateChannel(timeNow);
         }
 
-        private void UpdateChannel()
+        private readonly List<KChannel> removeWaitAcceptChannels = new List<KChannel>();
+        private void CheckWaitAcceptChannel(uint timeNow)
+        {
+            removeWaitAcceptChannels.Clear();
+            foreach (var kv in this.waitAcceptChannels)
+            {
+                KChannel kChannel = kv.Value;
+                if (kChannel.IsDisposed)
+                {
+                    continue;
+                }
+
+                if (kChannel.IsConnected)
+                {
+                    continue;
+                }
+
+                if (timeNow < kChannel.CreateTime + ConnectTimeoutTime)
+                {
+                    continue;
+                }
+
+                removeWaitAcceptChannels.Add(kChannel);
+            }
+
+            foreach (KChannel kChannel in this.removeWaitAcceptChannels)
+            {
+                kChannel.OnError(ErrorCore.ERR_KcpAcceptTimeout);
+            }
+        }
+
+        private void UpdateChannel(uint timeNow)
         {
             foreach (long id in this.updateIds)
             {
@@ -531,15 +580,62 @@ namespace ET
                     continue;
                 }
 
-                kChannel.Update();
+                kChannel.Update(timeNow);
             }
             this.updateIds.Clear();
         }
         
         // 服务端需要看channel的update时间是否已到
-        public void AddToUpdate(long id)
+        public void AddToUpdate(long time, long id)
         {
-            this.updateIds.Add(id);
+            if (time == 0)
+            {
+                this.updateIds.Add(id);
+                return;
+            }
+            if (time < this.minTime)
+            {
+                this.minTime = time;
+            }
+            this.timeId.Add(time, id);
+        }
+        
+        // 计算到期需要update的channel
+        private void TimerOut(uint timeNow)
+        {
+            if (this.timeId.Count == 0)
+            {
+                return;
+            }
+            
+
+            if (timeNow < this.minTime)
+            {
+                return;
+            }
+
+            this.timeOutTime.Clear();
+
+            foreach (KeyValuePair<long, List<long>> kv in this.timeId)
+            {
+                long k = kv.Key;
+                if (k > timeNow)
+                {
+                    minTime = k;
+                    break;
+                }
+
+                this.timeOutTime.Add(k);
+            }
+
+            foreach (long k in this.timeOutTime)
+            {
+                foreach (long v in this.timeId[k])
+                {
+                    this.updateIds.Add(v);
+                }
+                this.timeId.Remove(k);
+            }
         }
     }
 }

+ 4 - 17
Unity/Assets/Scripts/Core/Module/Network/MessageSerializeHelper.cs

@@ -5,33 +5,20 @@ namespace ET
 {
     public static class MessageSerializeHelper
     {
-        public static object DeserializeFrom(ushort opcode, Type type, MemoryStream memoryStream)
+        public static object DeserializeFrom(Type type, MemoryStream memoryStream)
         {
-            if (opcode >= OpcodeRangeDefine.JsonMinOpcode)
-            {
-                return JsonHelper.FromJson(type, memoryStream.GetBuffer().ToStr((int)memoryStream.Position, (int)(memoryStream.Length - memoryStream.Position)));
-            }
-            
             return ProtobufHelper.FromStream(type, memoryStream);
         }
 
-        public static void SerializeTo(ushort opcode, object obj, MemoryStream memoryStream)
+        public static void SerializeTo(object obj, MemoryStream memoryStream)
         {
             try
             {
-                if (opcode >= OpcodeRangeDefine.JsonMinOpcode)
-                {
-                    string s = JsonHelper.ToJson(obj);
-                    byte[] bytes = s.ToUtf8();
-                    memoryStream.Write(bytes, 0, bytes.Length);
-                    return;
-                }
-                
                 ProtobufHelper.ToStream(obj, memoryStream);
             }
             catch (Exception e)
             {
-                throw new Exception($"SerializeTo error: {opcode}", e);
+                throw new Exception($"SerializeTo error: {obj}", e);
             }
 
         }
@@ -63,7 +50,7 @@ namespace ET
             
             stream.GetBuffer().WriteTo(headOffset, opcode);
             
-            MessageSerializeHelper.SerializeTo(opcode, message, stream);
+            MessageSerializeHelper.SerializeTo(message, stream);
             
             stream.Seek(0, SeekOrigin.Begin);
             return (opcode, stream);

+ 3 - 6
Unity/Assets/Scripts/Core/Module/Network/NetServices.cs

@@ -131,9 +131,9 @@ namespace ET
             this.netThreadOperators.Enqueue(netOperator);
         }
         
-        public void RemoveChannel(int serviceId, long channelId)
+        public void RemoveChannel(int serviceId, long channelId, int error)
         {
-            NetOperator netOperator = new NetOperator() { Op = NetOp.RemoveChannel, ServiceId = serviceId, ChannelId = channelId};
+            NetOperator netOperator = new NetOperator() { Op = NetOp.RemoveChannel, ServiceId = serviceId, ChannelId = channelId, ActorId = error};
             this.netThreadOperators.Enqueue(netOperator);
         }
 
@@ -216,7 +216,6 @@ namespace ET
         
         private readonly Dictionary<int, AService> services = new Dictionary<int, AService>();
         private readonly Queue<int> queue = new Queue<int>();
-        public readonly TimerComponent TimerComponent = new TimerComponent();
         
         private void Add(AService aService)
         {
@@ -269,7 +268,7 @@ namespace ET
                         case NetOp.RemoveChannel:
                         {
                             AService service = this.Get(op.ServiceId);
-                            service.Remove(op.ChannelId);
+                            service.Remove(op.ChannelId, (int)op.ActorId);
                             break;
                         }
                         case NetOp.SendStream:
@@ -334,8 +333,6 @@ namespace ET
             }
             
             this.RunNetThreadOperator();
-            
-            TimerComponent.Update();
         }
 
         public void OnAccept(int serviceId, long channelId, IPEndPoint ipEndPoint)

+ 0 - 12
Unity/Assets/Scripts/Core/Module/Network/OpcodeRangeDefine.cs

@@ -2,25 +2,13 @@
 {
     public static class OpcodeRangeDefine
     {
-        // 10001 - 30000 是pb,中间分成两个部分,外网pb跟内网pb
-        public const ushort PbMinOpcode = 10001;
-        
         public const ushort OuterMinOpcode = 10001;
         public const ushort OuterMaxOpcode = 20000;
 
         // 20001-30000 内网pb
         public const ushort InnerMinOpcode = 20001;
-        
-        public const ushort PbMaxOpcode = 30000;
-        
-        // 30001 - 40000 是bson,bson只用于内网
-        public const ushort MongoMinOpcode = 30001;
-        
         public const ushort InnerMaxOpcode = 40000;
         
-        public const ushort JsonMinOpcode = 50000;
-        public const ushort JsonMaxOpcode = 60000;
-        
         public const ushort MaxOpcode = 60000;
     }
 }

+ 20 - 47
Unity/Assets/Scripts/Core/Module/Network/TChannel.cs

@@ -18,8 +18,6 @@ namespace ET
 
 		private readonly CircularBuffer recvBuffer = new CircularBuffer();
 		private readonly CircularBuffer sendBuffer = new CircularBuffer();
-		
-		private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
 
 		private bool isSending;
 
@@ -31,23 +29,7 @@ namespace ET
 
 		private void OnComplete(object sender, SocketAsyncEventArgs e)
 		{
-			switch (e.LastOperation)
-			{
-				case SocketAsyncOperation.Connect:
-					this.Service.ThreadSynchronizationContext.Post(()=>OnConnectComplete(e));
-					break;
-				case SocketAsyncOperation.Receive:
-					this.Service.ThreadSynchronizationContext.Post(()=>OnRecvComplete(e));
-					break;
-				case SocketAsyncOperation.Send:
-					this.Service.ThreadSynchronizationContext.Post(()=>OnSendComplete(e));
-					break;
-				case SocketAsyncOperation.Disconnect:
-					this.Service.ThreadSynchronizationContext.Post(()=>OnDisconnectComplete(e));
-					break;
-				default:
-					throw new Exception($"socket error: {e.LastOperation}");
-			}
+			this.Service.Queue.Enqueue(new TArgs() {ChannelId = this.Id, SocketAsyncEventArgs = e});
 		}
 		
 		public TChannel(long id, IPEndPoint ipEndPoint, TService service)
@@ -65,7 +47,7 @@ namespace ET
 			this.isConnected = false;
 			this.isSending = false;
 
-			this.Service.ThreadSynchronizationContext.Post(this.ConnectAsync);
+			this.ConnectAsync();
 		}
 		
 		public TChannel(long id, Socket socket, TService service)
@@ -83,12 +65,8 @@ namespace ET
 			this.isConnected = true;
 			this.isSending = false;
 			
-			// 下一帧再开始读写
-			this.Service.ThreadSynchronizationContext.Post(() =>
-			{
-				this.StartRecv();
-				this.StartSend();
-			});
+			this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
+			this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartRecv, ChannelId = this.Id});
 		}
 		
 		
@@ -100,7 +78,7 @@ namespace ET
 				return;
 			}
 
-			Log.Info($"channel dispose: {this.Id} {this.RemoteAddress}");
+			Log.Info($"channel dispose: {this.Id} {this.RemoteAddress} {this.Error}");
 			
 			long id = this.Id;
 			this.Id = 0;
@@ -154,7 +132,7 @@ namespace ET
 			if (!this.isSending)
 			{
 				//this.StartSend();
-				this.Service.NeedStartSend.Add(this.Id);
+				this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
 			}
 		}
 
@@ -168,13 +146,12 @@ namespace ET
 			OnConnectComplete(this.outArgs);
 		}
 
-		private void OnConnectComplete(object o)
+		public void OnConnectComplete(SocketAsyncEventArgs e)
 		{
 			if (this.socket == null)
 			{
 				return;
 			}
-			SocketAsyncEventArgs e = (SocketAsyncEventArgs) o;
 			
 			if (e.SocketError != SocketError.Success)
 			{
@@ -184,17 +161,17 @@ namespace ET
 
 			e.RemoteEndPoint = null;
 			this.isConnected = true;
-			this.StartRecv();
-			this.StartSend();
+			
+			this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
+			this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartRecv, ChannelId = this.Id});
 		}
 
-		private void OnDisconnectComplete(object o)
+		public void OnDisconnectComplete(SocketAsyncEventArgs e)
 		{
-			SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
 			this.OnError((int)e.SocketError);
 		}
 
-		private void StartRecv()
+		public void StartRecv()
 		{
 			while (true)
 			{
@@ -223,7 +200,7 @@ namespace ET
 			}
 		}
 
-		private void OnRecvComplete(object o)
+		public void OnRecvComplete(object o)
 		{
 			this.HandleRecv(o);
 			
@@ -231,7 +208,8 @@ namespace ET
 			{
 				return;
 			}
-			this.StartRecv();
+			
+			this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartRecv, ChannelId = this.Id});
 		}
 
 		private void HandleRecv(object o)
@@ -288,12 +266,7 @@ namespace ET
 			}
 		}
 
-		public void Update()
-		{
-			this.StartSend();
-		}
-
-		private void StartSend()
+		public void StartSend()
 		{
 			if(!this.isConnected)
 			{
@@ -346,13 +319,13 @@ namespace ET
 			}
 		}
 
-		private void OnSendComplete(object o)
+		public void OnSendComplete(object o)
 		{
 			HandleSend(o);
 			
 			this.isSending = false;
 			
-			this.StartSend();
+			this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
 		}
 
 		private void HandleSend(object o)
@@ -397,7 +370,7 @@ namespace ET
 					{
 						ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
 						Type type = NetServices.Instance.GetType(opcode);
-						message = MessageSerializeHelper.DeserializeFrom(opcode, type, memoryStream);
+						message = MessageSerializeHelper.DeserializeFrom(type, memoryStream);
 						break;
 					}
 					case ServiceType.Inner:
@@ -405,7 +378,7 @@ namespace ET
 						actorId = BitConverter.ToInt64(memoryStream.GetBuffer(), Packet.ActorIdIndex);
 						ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.OpcodeIndex);
 						Type type = NetServices.Instance.GetType(opcode);
-						message = MessageSerializeHelper.DeserializeFrom(opcode, type, memoryStream);
+						message = MessageSerializeHelper.DeserializeFrom(type, memoryStream);
 						break;
 					}
 				}

+ 85 - 19
Unity/Assets/Scripts/Core/Module/Network/TService.cs

@@ -1,4 +1,5 @@
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.IO;
 using System.Linq;
@@ -7,6 +8,20 @@ using System.Net.Sockets;
 
 namespace ET
 {
+	public enum TcpOp
+	{
+		StartSend,
+		StartRecv,
+		SocketAsyncEventArgs,
+	}
+	
+	public struct TArgs
+	{
+		public TcpOp Op;
+		public long ChannelId;
+		public SocketAsyncEventArgs SocketAsyncEventArgs;
+	}
+	
 	public sealed class TService : AService
 	{
 		private readonly Dictionary<long, TChannel> idChannels = new Dictionary<long, TChannel>();
@@ -15,20 +30,16 @@ namespace ET
 		
 		private Socket acceptor;
 
-		public HashSet<long> NeedStartSend = new HashSet<long>();
-
-		public ThreadSynchronizationContext ThreadSynchronizationContext;
+		public ConcurrentQueue<TArgs> Queue = new ConcurrentQueue<TArgs>();
 
 		public TService(AddressFamily addressFamily, ServiceType serviceType)
 		{
 			this.ServiceType = serviceType;
-			this.ThreadSynchronizationContext = new ThreadSynchronizationContext();
 		}
 
 		public TService(IPEndPoint ipEndPoint, ServiceType serviceType)
 		{
 			this.ServiceType = serviceType;
-			this.ThreadSynchronizationContext = new ThreadSynchronizationContext();
 			
 			this.acceptor = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
 			// 容易出问题,先注释掉,按需开启
@@ -45,7 +56,7 @@ namespace ET
 			
 			this.acceptor.Listen(1000);
 			
-			this.ThreadSynchronizationContext.Post(this.AcceptAsync);
+			this.AcceptAsync();
 		}
 
 		private void OnComplete(object sender, SocketAsyncEventArgs e)
@@ -53,9 +64,7 @@ namespace ET
 			switch (e.LastOperation)
 			{
 				case SocketAsyncOperation.Accept:
-					SocketError socketError = e.SocketError;
-					Socket acceptSocket = e.AcceptSocket;
-					this.ThreadSynchronizationContext.Post(()=>{this.OnAcceptComplete(socketError, acceptSocket);});
+					this.Queue.Enqueue(new TArgs() {SocketAsyncEventArgs = e});
 					break;
 				default:
 					throw new Exception($"socket error: {e.LastOperation}");
@@ -133,7 +142,6 @@ namespace ET
 			this.acceptor?.Close();
 			this.acceptor = null;
 			this.innArgs.Dispose();
-			ThreadSynchronizationContext = null;
 			
 			foreach (long id in this.idChannels.Keys.ToArray())
 			{
@@ -143,13 +151,13 @@ namespace ET
 			this.idChannels.Clear();
 		}
 
-		public override void Remove(long id)
+		public override void Remove(long id, int error = 0)
 		{
 			if (this.idChannels.TryGetValue(id, out TChannel channel))
 			{
+				channel.Error = error;
 				channel.Dispose();	
 			}
-
 			this.idChannels.Remove(id);
 		}
 
@@ -173,19 +181,77 @@ namespace ET
 		
 		public override void Update()
 		{
-			this.ThreadSynchronizationContext.Update();
-			
-			foreach (long channelId in this.NeedStartSend)
+			while (true)
 			{
-				TChannel tChannel = this.Get(channelId);
-				tChannel?.Update();
+				if (!this.Queue.TryDequeue(out var result))
+				{
+					break;
+				}
+				
+				SocketAsyncEventArgs e = result.SocketAsyncEventArgs;
+
+				if (e == null)
+				{
+					switch (result.Op)
+					{
+						case TcpOp.StartSend:
+						{
+							TChannel tChannel = this.Get(result.ChannelId);
+							tChannel.StartSend();
+							break;
+						}
+						case TcpOp.StartRecv:
+						{
+							TChannel tChannel = this.Get(result.ChannelId);
+							tChannel.StartRecv();
+							break;
+						}
+					}
+					continue;
+				}
+
+				switch (e.LastOperation)
+				{
+					case SocketAsyncOperation.Accept:
+					{
+						SocketError socketError = e.SocketError;
+						Socket acceptSocket = e.AcceptSocket;
+						this.OnAcceptComplete(socketError, acceptSocket);
+						break;
+					}
+					case SocketAsyncOperation.Connect:
+					{
+						TChannel tChannel = this.Get(result.ChannelId);
+						tChannel.OnConnectComplete(e);
+						break;
+					}
+					case SocketAsyncOperation.Disconnect:
+					{
+						TChannel tChannel = this.Get(result.ChannelId);
+						tChannel.OnDisconnectComplete(e);
+						break;
+					}
+					case SocketAsyncOperation.Receive:
+					{
+						TChannel tChannel = this.Get(result.ChannelId);
+						tChannel.OnRecvComplete(e);
+						break;
+					}
+					case SocketAsyncOperation.Send:
+					{
+						TChannel tChannel = this.Get(result.ChannelId);
+						tChannel.OnSendComplete(e);
+						break;
+					}
+					default:
+						throw new ArgumentOutOfRangeException($"{e.LastOperation}");
+				}
 			}
-			this.NeedStartSend.Clear();
 		}
 		
 		public override bool IsDispose()
 		{
-			return this.ThreadSynchronizationContext == null;
+			return this.acceptor == null;
 		}
 	}
 }

+ 2 - 2
Unity/Assets/Scripts/Core/Module/Network/WChannel.cs

@@ -221,7 +221,7 @@ namespace ET
                     {
                         ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.KcpOpcodeIndex);
                         Type type = NetServices.Instance.GetType(opcode);
-                        message = MessageSerializeHelper.DeserializeFrom(opcode, type, memoryStream);
+                        message = MessageSerializeHelper.DeserializeFrom(type, memoryStream);
                         break;
                     }
                     case ServiceType.Inner:
@@ -229,7 +229,7 @@ namespace ET
                         actorId = BitConverter.ToInt64(memoryStream.GetBuffer(), Packet.ActorIdIndex);
                         ushort opcode = BitConverter.ToUInt16(memoryStream.GetBuffer(), Packet.OpcodeIndex);
                         Type type = NetServices.Instance.GetType(opcode);
-                        message = MessageSerializeHelper.DeserializeFrom(opcode, type, memoryStream);
+                        message = MessageSerializeHelper.DeserializeFrom(type, memoryStream);
                         break;
                     }
                 }

+ 3 - 1
Unity/Assets/Scripts/Core/Module/Network/WService.cs

@@ -46,7 +46,7 @@ namespace ET
             return channel;
         }
 
-        public override void Remove(long id)
+        public override void Remove(long id, int error = 0)
         {
             WChannel channel;
             if (!this.channels.TryGetValue(id, out channel))
@@ -54,6 +54,8 @@ namespace ET
                 return;
             }
 
+            channel.Error = error;
+
             this.channels.Remove(id);
             channel.Dispose();
         }

+ 6 - 0
Unity/Assets/Scripts/Core/Module/ObjectPool/ObjectPool.cs

@@ -36,6 +36,12 @@ namespace ET
                 queue = new Queue<object>();
                 pool.Add(type, queue);
             }
+
+            // 一种对象最大为1000个
+            if (queue.Count > 1000)
+            {
+                return;
+            }
             queue.Enqueue(obj);
         }
     }

+ 1 - 1
Unity/Assets/Scripts/Core/Module/Timer/TimerComponent.cs

@@ -274,7 +274,7 @@ namespace ET
         {
             if (tillTime < TimeHelper.ClientNow())
             {
-                Log.Warning($"new once time too small: {tillTime}");
+                Log.Error($"new once time too small: {tillTime}");
             }
 
             TimerAction timer = TimerAction.Create(TimerClass.OnceTimer, tillTime, type, args);

+ 0 - 2
Unity/Assets/Scripts/Core/TimerCoreCallbackId.cs

@@ -3,7 +3,5 @@
     public static class TimerCoreCallbackId
     {
         public const int CoroutineTimeout = 1;
-        public const int KServiceConnectTimer = 2;
-        public const int KServiceNextUpdateTimer = 3;
     }
 }