Просмотр исходного кода

外网设置tcp消息长为2字节,内网设置为4字节,但是内网消息长度限制为ushort.Max * 16, 消息过长应该从逻辑层避免

tanghai 7 лет назад
Родитель
Сommit
157b416cee

+ 5 - 8
Server/Hotfix/Module/Benchmark/BenchmarkComponentSystem.cs

@@ -1,5 +1,6 @@
 using System;
 using System.Net;
+using System.Threading.Tasks;
 using ETModel;
 
 namespace ETHotfix
@@ -21,9 +22,9 @@ namespace ETHotfix
 			{
 				IPEndPoint ipEndPoint = NetworkHelper.ToIPEndPoint(address);
 				NetOuterComponent networkComponent = Game.Scene.GetComponent<NetOuterComponent>();
-				for (int i = 0; i < 1000; i++)
+				for (int i = 0; i < 2000; i++)
 				{
-					self.TestAsync(networkComponent, ipEndPoint, i).Coroutine();
+					self.TestAsync(networkComponent, ipEndPoint, i);
 				}
 			}
 			catch (Exception e)
@@ -32,7 +33,7 @@ namespace ETHotfix
 			}
 		}
 
-		public static async ETVoid TestAsync(this BenchmarkComponent self, NetOuterComponent networkComponent, IPEndPoint ipEndPoint, int j)
+		public static async void TestAsync(this BenchmarkComponent self, NetOuterComponent networkComponent, IPEndPoint ipEndPoint, int j)
 		{
 			try
 			{
@@ -46,17 +47,13 @@ namespace ETHotfix
 					}
 				}
 			}
-			catch (RpcException e)
-			{
-				Log.Error(e);
-			}
 			catch (Exception e)
 			{
 				Log.Error(e);
 			}
 		}
 
-		public static async ETTask Send(this BenchmarkComponent self, Session session, int j)
+		public static async Task Send(this BenchmarkComponent self, Session session, int j)
 		{
 			try
 			{

+ 4 - 3
Server/Hotfix/Module/Message/NetInnerComponentSystem.cs

@@ -1,4 +1,5 @@
-using ETModel;
+using System.Net;
+using ETModel;
 
 namespace ETHotfix
 {
@@ -43,7 +44,7 @@ namespace ETHotfix
 	{
 		public static void Awake(this NetInnerComponent self)
 		{
-			self.Awake(NetworkProtocol.TCP);
+			self.Awake(NetworkProtocol.TCP, Packet.PacketSizeLength4);
 			self.MessagePacker = new MongoPacker();
 			self.MessageDispatcher = new InnerMessageDispatcher();
 			self.AppType = StartConfigComponent.Instance.StartConfig.AppType;
@@ -51,7 +52,7 @@ namespace ETHotfix
 
 		public static void Awake(this NetInnerComponent self, string address)
 		{
-			self.Awake(NetworkProtocol.TCP, address);
+			self.Awake(NetworkProtocol.TCP, address, Packet.PacketSizeLength4);
 			self.MessagePacker = new MongoPacker();
 			self.MessageDispatcher = new InnerMessageDispatcher();
 			self.AppType = StartConfigComponent.Instance.StartConfig.AppType;

+ 3 - 3
Unity/Assets/Model/Module/Message/Network/AChannel.cs

@@ -14,7 +14,7 @@ namespace ETModel
 	{
 		public ChannelType ChannelType { get; }
 
-		protected AService service;
+		public AService Service { get; }
 
 		public abstract MemoryStream Stream { get; }
 		
@@ -65,7 +65,7 @@ namespace ETModel
 		{
 			this.Id = IdGenerater.GenerateId();
 			this.ChannelType = channelType;
-			this.service = service;
+			this.Service = service;
 		}
 
 		public abstract void Start();
@@ -81,7 +81,7 @@ namespace ETModel
 
 			base.Dispose();
 
-			this.service.Remove(this.Id);
+			this.Service.Remove(this.Id);
 		}
 	}
 }

+ 3 - 3
Unity/Assets/Model/Module/Message/Network/KCP/KChannel.cs

@@ -1,4 +1,4 @@
-using System;
+using System;
 using System.Collections.Generic;
 using System.IO;
 using System.Net;
@@ -132,7 +132,7 @@ namespace ETModel
 
 		private KService GetService()
 		{
-			return (KService)this.service;
+			return (KService)this.Service;
 		}
 
 		public void HandleConnnect(uint remoteConn)
@@ -250,7 +250,7 @@ namespace ETModel
 					return;
 				}
 				
-				if (timeNow - this.lastRecvTime < 200)
+				if (timeNow - this.lastRecvTime < 500)
 				{
 					return;
 				}

+ 20 - 11
Unity/Assets/Model/Module/Message/Network/TCP/PacketParser.cs

@@ -8,12 +8,11 @@ namespace ETModel
 		PacketSize,
 		PacketBody
 	}
-
+	
 	public static class Packet
 	{
-		public static int SizeLength = 4;
-		public const int MinSize = 3;
-		public const int MaxSize = int.MaxValue;
+		public const int PacketSizeLength2 = 2;
+		public const int PacketSizeLength4 = 4;
 		public const int FlagIndex = 0;
 		public const int OpcodeIndex = 1;
 		public const int MessageIndex = 3;
@@ -26,9 +25,11 @@ namespace ETModel
 		private ParserState state;
 		public MemoryStream memoryStream;
 		private bool isOK;
+		private readonly int packetSizeLength;
 
-		public PacketParser(CircularBuffer buffer, MemoryStream memoryStream)
+		public PacketParser(int packetSizeLength, CircularBuffer buffer, MemoryStream memoryStream)
 		{
+			this.packetSizeLength = packetSizeLength;
 			this.buffer = buffer;
 			this.memoryStream = memoryStream;
 		}
@@ -46,24 +47,32 @@ namespace ETModel
 				switch (this.state)
 				{
 					case ParserState.PacketSize:
-						if (this.buffer.Length < Packet.SizeLength)
+						if (this.buffer.Length < this.packetSizeLength)
 						{
 							finish = true;
 						}
 						else
 						{
-							this.buffer.Read(this.memoryStream.GetBuffer(), 0, Packet.SizeLength);
+							this.buffer.Read(this.memoryStream.GetBuffer(), 0, this.packetSizeLength);
 							
-							switch (Packet.SizeLength)
+							switch (this.packetSizeLength)
 							{
-								case 4:
+								case Packet.PacketSizeLength4:
 									this.packetSize = BitConverter.ToInt32(this.memoryStream.GetBuffer(), 0);
+									if (this.packetSize > ushort.MaxValue * 16 || this.packetSize < 3)
+									{
+										throw new Exception($"recv packet size error: {this.packetSize}");
+									}
 									break;
-								case 2:
+								case Packet.PacketSizeLength2:
 									this.packetSize = BitConverter.ToUInt16(this.memoryStream.GetBuffer(), 0);
+									if (this.packetSize > ushort.MaxValue || this.packetSize < 3)
+									{
+										throw new Exception($"recv packet size error: {this.packetSize}");
+									}
 									break;
 								default:
-									throw new Exception("packet size must be 2 or 4!");
+									throw new Exception("packet size byte count must be 2 or 4!");
 							}
 
 							this.state = ParserState.PacketBody;

+ 41 - 18
Unity/Assets/Model/Module/Message/Network/TCP/TChannel.cs

@@ -1,7 +1,10 @@
 using System;
+using System.Collections.Generic;
 using System.IO;
+using System.Linq;
 using System.Net;
 using System.Net.Sockets;
+using Microsoft.IO;
 
 namespace ETModel
 {
@@ -27,15 +30,17 @@ namespace ETModel
 
 		private readonly PacketParser parser;
 
-		private readonly byte[] cache = new byte[Packet.SizeLength];
+		private readonly byte[] packetSizeCache;
 		
 		public TChannel(IPEndPoint ipEndPoint, TService service): base(service, ChannelType.Connect)
 		{
-			this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
+			int packetSize = service.PacketSizeLength;
+			this.packetSizeCache = new byte[packetSize];
+			this.memoryStream = service.MemoryStreamManager.GetStream("message", ushort.MaxValue);
 			
 			this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 			this.socket.NoDelay = true;
-			this.parser = new PacketParser(this.recvBuffer, this.memoryStream);
+			this.parser = new PacketParser(packetSize, this.recvBuffer, this.memoryStream);
 			this.innArgs.Completed += this.OnComplete;
 			this.outArgs.Completed += this.OnComplete;
 
@@ -47,11 +52,13 @@ namespace ETModel
 		
 		public TChannel(Socket socket, TService service): base(service, ChannelType.Accept)
 		{
-			this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
+			int packetSize = service.PacketSizeLength;
+			this.packetSizeCache = new byte[packetSize];
+			this.memoryStream = service.MemoryStreamManager.GetStream("message", ushort.MaxValue);
 			
 			this.socket = socket;
 			this.socket.NoDelay = true;
-			this.parser = new PacketParser(this.recvBuffer, this.memoryStream);
+			this.parser = new PacketParser(packetSize, this.recvBuffer, this.memoryStream);
 			this.innArgs.Completed += this.OnComplete;
 			this.outArgs.Completed += this.OnComplete;
 
@@ -81,7 +88,7 @@ namespace ETModel
 		
 		private TService GetService()
 		{
-			return (TService)this.service;
+			return (TService)this.Service;
 		}
 
 		public override MemoryStream Stream
@@ -116,19 +123,27 @@ namespace ETModel
 				throw new Exception("TChannel已经被Dispose, 不能发送消息");
 			}
 
-			switch (Packet.SizeLength)
+			switch (this.GetService().PacketSizeLength)
 			{
-				case 4:
-					this.cache.WriteTo(0, (int) stream.Length);
+				case Packet.PacketSizeLength4:
+					if (stream.Length > ushort.MaxValue * 16)
+					{
+						throw new Exception($"send packet too large: {stream.Length}");
+					}
+					this.packetSizeCache.WriteTo(0, (int) stream.Length);
 					break;
-				case 2:
-					this.cache.WriteTo(0, (ushort) stream.Length);
+				case Packet.PacketSizeLength2:
+					if (stream.Length > ushort.MaxValue)
+					{
+						throw new Exception($"send packet too large: {stream.Length}");
+					}
+					this.packetSizeCache.WriteTo(0, (ushort) stream.Length);
 					break;
 				default:
 					throw new Exception("packet size must be 2 or 4!");
 			}
 
-			this.sendBuffer.Write(this.cache, 0, this.cache.Length);
+			this.sendBuffer.Write(this.packetSizeCache, 0, this.packetSizeCache.Length);
 			this.sendBuffer.Write(stream);
 
 			this.GetService().MarkNeedStartSend(this.Id);
@@ -245,19 +260,27 @@ namespace ETModel
 			// 收到消息回调
 			while (true)
 			{
-				if (!this.parser.Parse())
+				try
 				{
-					break;
+					if (!this.parser.Parse())
+					{
+						break;
+					}
+				}
+				catch (Exception ee)
+				{
+					Log.Error(ee);
+					this.OnError(ErrorCode.ERR_SocketError);
+					return;
 				}
 
-				MemoryStream stream = this.parser.GetPacket();
 				try
 				{
-					this.OnRead(stream);
+					this.OnRead(this.parser.GetPacket());
 				}
-				catch (Exception exception)
+				catch (Exception ee)
 				{
-					Log.Error(exception);
+					Log.Error(ee);
 				}
 			}
 

+ 7 - 3
Unity/Assets/Model/Module/Message/Network/TCP/TService.cs

@@ -18,11 +18,14 @@ namespace ETModel
 		
 		public HashSet<long> needStartSendChannel = new HashSet<long>();
 		
+		public int PacketSizeLength { get; }
+		
 		/// <summary>
 		/// 即可做client也可做server
 		/// </summary>
-		public TService(IPEndPoint ipEndPoint, Action<AChannel> acceptCallback)
+		public TService(int packetSizeLength, IPEndPoint ipEndPoint, Action<AChannel> acceptCallback)
 		{
+			this.PacketSizeLength = packetSizeLength;
 			this.AcceptCallback += acceptCallback;
 			
 			this.acceptor = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
@@ -35,8 +38,9 @@ namespace ETModel
 			this.AcceptAsync();
 		}
 
-		public TService()
+		public TService(int packetSizeLength)
 		{
+			this.PacketSizeLength = packetSizeLength;
 		}
 		
 		public override void Dispose()
@@ -66,7 +70,7 @@ namespace ETModel
 					OneThreadSynchronizationContext.Instance.Post(this.OnAcceptComplete, e);
 					break;
 				default:
-					throw new Exception($"socket error: {e.LastOperation}");
+					throw new Exception($"socket accept error: {e.LastOperation}");
 			}
 		}
 		

+ 1 - 1
Unity/Assets/Model/Module/Message/Network/WebSocket/WChannel.cs

@@ -84,7 +84,7 @@ namespace ETModel
         
         private WService GetService()
         {
-            return (WService)this.service;
+            return (WService)this.Service;
         }
 
         public async ETVoid ConnectAsync(string url)

+ 16 - 23
Unity/Assets/Model/Module/Message/NetworkComponent.cs

@@ -9,7 +9,7 @@ namespace ETModel
 	{
 		public AppType AppType;
 		
-		private AService Service;
+		protected AService Service;
 
 		private readonly Dictionary<long, Session> sessions = new Dictionary<long, Session>();
 
@@ -17,30 +17,23 @@ namespace ETModel
 
 		public IMessageDispatcher MessageDispatcher { get; set; }
 
-		public void Awake(NetworkProtocol protocol)
+		public void Awake(NetworkProtocol protocol, int packetSize = Packet.PacketSizeLength2)
 		{
-			try
-			{
-				switch (protocol)
-				{
-					case NetworkProtocol.KCP:
-						this.Service = new KService() {Parent = this};
-						break;
-					case NetworkProtocol.TCP:
-						this.Service = new TService() {Parent = this};
-						break;
-					case NetworkProtocol.WebSocket:
-						this.Service = new WService() {Parent = this};
-						break;
-				}
-			}
-			catch (Exception e)
+			switch (protocol)
 			{
-				throw new Exception($"{e}");
+				case NetworkProtocol.KCP:
+					this.Service = new KService() { Parent = this };
+					break;
+				case NetworkProtocol.TCP:
+					this.Service = new TService(packetSize) { Parent = this };
+					break;
+				case NetworkProtocol.WebSocket:
+					this.Service = new WService() { Parent = this };
+					break;
 			}
 		}
 
-		public void Awake(NetworkProtocol protocol, string address)
+		public void Awake(NetworkProtocol protocol, string address, int packetSize = Packet.PacketSizeLength2)
 		{
 			try
 			{
@@ -49,15 +42,15 @@ namespace ETModel
 				{
 					case NetworkProtocol.KCP:
 						ipEndPoint = NetworkHelper.ToIPEndPoint(address);
-						this.Service = new KService(ipEndPoint, this.OnAccept) {Parent = this};
+						this.Service = new KService(ipEndPoint, this.OnAccept) { Parent = this };
 						break;
 					case NetworkProtocol.TCP:
 						ipEndPoint = NetworkHelper.ToIPEndPoint(address);
-						this.Service = new TService(ipEndPoint, this.OnAccept) {Parent = this};
+						this.Service = new TService(packetSize, ipEndPoint, this.OnAccept) { Parent = this };
 						break;
 					case NetworkProtocol.WebSocket:
 						string[] prefixs = address.Split(';');
-						this.Service = new WService(prefixs, this.OnAccept) {Parent = this};
+						this.Service = new WService(prefixs, this.OnAccept) { Parent = this };
 						break;
 				}
 			}

+ 9 - 14
Unity/Assets/Model/Module/Message/Session.cs

@@ -4,6 +4,7 @@ using System.IO;
 using System.Linq;
 using System.Net;
 using System.Threading;
+using System.Threading.Tasks;
 
 namespace ETModel
 {
@@ -47,7 +48,6 @@ namespace ETModel
 		public void Awake(AChannel aChannel)
 		{
 			this.channel = aChannel;
-			this.channel.Parent = this;
 			this.requestCallback.Clear();
 			long id = this.Id;
 			channel.ErrorCallback += (c, e) =>
@@ -65,7 +65,7 @@ namespace ETModel
 			}
 
 			this.Network.Remove(this.Id);
-			
+
 			base.Dispose();
 			
 			foreach (Action<IResponse> action in this.requestCallback.Values.ToArray())
@@ -80,6 +80,7 @@ namespace ETModel
 			//}
 			
 			this.channel.Dispose();
+			
 			this.requestCallback.Clear();
 		}
 
@@ -181,10 +182,10 @@ namespace ETModel
 			action(response);
 		}
 
-		public ETTask<IResponse> Call(IRequest request)
+		public Task<IResponse> Call(IRequest request)
 		{
 			int rpcId = ++RpcId;
-			var tcs = new ETTaskCompletionSource<IResponse>();
+			var tcs = new TaskCompletionSource<IResponse>();
 
 			this.requestCallback[rpcId] = (response) =>
 			{
@@ -195,11 +196,11 @@ namespace ETModel
 						throw new RpcException(response.Error, response.Message);
 					}
 
-					tcs.TrySetResult(response);
+					tcs.SetResult(response);
 				}
 				catch (Exception e)
 				{
-					tcs.TrySetException(new Exception($"Rpc Error: {request.GetType().FullName}", e));
+					tcs.SetException(new Exception($"Rpc Error: {request.GetType().FullName}", e));
 				}
 			};
 
@@ -208,10 +209,10 @@ namespace ETModel
 			return tcs.Task;
 		}
 
-		public ETTask<IResponse> Call(IRequest request, CancellationToken cancellationToken)
+		public Task<IResponse> Call(IRequest request, CancellationToken cancellationToken)
 		{
 			int rpcId = ++RpcId;
-			var tcs = new ETTaskCompletionSource<IResponse>();
+			var tcs = new TaskCompletionSource<IResponse>();
 
 			this.requestCallback[rpcId] = (response) =>
 			{
@@ -286,12 +287,6 @@ namespace ETModel
 			stream.SetLength(Packet.MessageIndex);
 			this.Network.MessagePacker.SerializeTo(message, stream);
 			stream.Seek(0, SeekOrigin.Begin);
-
-			if (stream.Length > ushort.MaxValue)
-			{
-				Log.Error($"message too large: {stream.Length}, opcode: {opcode}");
-				return;
-			}
 			
 			this.byteses[0][0] = flag;
 			this.byteses[1].WriteTo(0, opcode);