tanghai 7 лет назад
Родитель
Сommit
4d7af24340

+ 23 - 5
Unity/Assets/Scripts/Module/Message/Network/AChannel.cs

@@ -42,6 +42,29 @@ namespace ETModel
 				this.errorCallback -= value;
 			}
 		}
+		
+		private Action<Packet> readCallback;
+
+		public event Action<Packet> ReadCallback
+		{
+			add
+			{
+				this.readCallback += value;
+			}
+			remove
+			{
+				this.readCallback -= value;
+			}
+		}
+
+		protected void OnRead(Packet packet)
+		{
+			if (this.IsDisposed)
+			{
+				return;
+			}
+			this.readCallback?.Invoke(packet);
+		}
 
 		protected void OnError(int error)
 		{
@@ -66,11 +89,6 @@ namespace ETModel
 
 		public abstract void Send(List<byte[]> buffers);
 
-		/// <summary>
-		/// 接收消息
-		/// </summary>
-		public abstract Task<Packet> Recv();
-
 		public override void Dispose()
 		{
 			if (this.IsDisposed)

+ 4 - 48
Unity/Assets/Scripts/Module/Message/Network/KCP/KChannel.cs

@@ -34,8 +34,6 @@ namespace ETModel
 		private bool isConnected;
 		private readonly IPEndPoint remoteEndPoint;
 
-		private TaskCompletionSource<Packet> recvTcs;
-
 		private uint lastRecvTime;
 
 		private readonly byte[] cacheBytes = new byte[ushort.MaxValue];
@@ -202,31 +200,15 @@ namespace ETModel
 				this.recvBuffer.Write(sizeBuffer, 0, sizeBuffer.Length);
 				this.recvBuffer.Write(cacheBytes, 0, count);
 
-				if (this.recvTcs == null)
-				{
-					continue;
-				}
-
-				try
+				while (true)
 				{
-					bool isOK = this.parser.Parse();
-					if (!isOK)
+					if (!this.parser.Parse())
 					{
-						continue;
+						break;
 					}
 
 					Packet packet = this.parser.GetPacket();
-					var tcs = this.recvTcs;
-					this.recvTcs = null;
-					tcs.SetResult(packet);
-				}
-				catch (Exception e)
-				{
-					this.OnError(ErrorCode.ERR_PacketParserError);
-						
-					var tcs = this.recvTcs;
-					this.recvTcs = null;
-					tcs.SetException(e);
+					this.OnRead(packet);
 				}
 			}
 		}
@@ -274,31 +256,5 @@ namespace ETModel
 
 			Send(bytes, 0, size);
 		}
-
-		public override Task<Packet> Recv()
-		{
-			if (this.IsDisposed)
-			{
-				throw new Exception("TChannel已经被Dispose, 不能接收消息");
-			}
-
-			try
-			{
-				bool isOK = this.parser.Parse();
-				if (isOK)
-				{
-					Packet packet = this.parser.GetPacket();
-					return Task.FromResult(packet);
-				}
-
-				this.recvTcs = new TaskCompletionSource<Packet>();
-				return this.recvTcs.Task;
-			}
-			catch (Exception)
-			{
-				this.OnError(ErrorCode.ERR_PacketParserError);
-				throw;
-			}
-		}
 	}
 }

+ 4 - 51
Unity/Assets/Scripts/Module/Message/Network/TCP/TChannel.cs

@@ -18,7 +18,6 @@ namespace ETModel
 		private bool isSending;
 		private readonly PacketParser parser;
 		private bool isConnected;
-		private TaskCompletionSource<Packet> recvTcs;
 
 		/// <summary>
 		/// connect
@@ -75,8 +74,6 @@ namespace ETModel
 			}
 			
 			base.Dispose();
-
-			this.recvTcs = null;
 			this.tcpClient.Close();
 		}
 
@@ -199,33 +196,15 @@ namespace ETModel
 						return;
 					}
 
-					// 如果没有recv调用
-					if (this.recvTcs == null)
-					{
-						continue;
-					}
-
-					try
+					while (true)
 					{
-						bool isOK = this.parser.Parse();
-						if (!isOK)
+						if (!this.parser.Parse())
 						{
-							continue;
+							break;
 						}
 
 						Packet packet = this.parser.GetPacket();
-
-						var tcs = this.recvTcs;
-						this.recvTcs = null;
-						tcs.SetResult(packet);
-					}
-					catch (Exception e)
-					{
-						this.OnError(ErrorCode.ERR_PacketParserError);
-						
-						var tcs = this.recvTcs;
-						this.recvTcs = null;
-						tcs.SetException(e);
+						this.OnRead(packet);
 					}
 				}
 			}
@@ -243,31 +222,5 @@ namespace ETModel
 				this.OnError((int)SocketError.SocketError);
 			}
 		}
-
-		public override Task<Packet> Recv()
-		{
-			if (this.IsDisposed)
-			{
-				throw new Exception("TChannel已经被Dispose, 不能接收消息");
-			}
-
-			try
-			{
-				bool isOK = this.parser.Parse();
-				if (isOK)
-				{
-					Packet packet = this.parser.GetPacket();
-					return Task.FromResult(packet);
-				}
-
-				this.recvTcs = new TaskCompletionSource<Packet>();
-				return this.recvTcs.Task;
-			}
-			catch (Exception)
-			{
-				this.OnError(ErrorCode.ERR_PacketParserError);
-				throw;
-			}
-		}
 	}
 }

+ 6 - 0
Unity/Assets/Scripts/Module/Message/NetworkComponent.cs

@@ -87,6 +87,9 @@ namespace ETModel
 				session.Error = e;
 				this.Remove(session.Id);
 			};
+
+			channel.ReadCallback += (packet) => { session.OnRead(packet); };
+			
 			this.sessions.Add(session.Id, session);
 			return session;
 		}
@@ -124,6 +127,9 @@ namespace ETModel
 					session.Error = e;
 					this.Remove(session.Id);
 				};
+				
+				channel.ReadCallback += (packet) => { session.OnRead(packet); };
+				
 				this.sessions.Add(session.Id, session);
 				return session;
 			}

+ 7 - 47
Unity/Assets/Scripts/Module/Message/Session.cs

@@ -17,15 +17,6 @@ namespace ETModel
 		}
 	}
 
-	[ObjectSystem]
-	public class SessionStartSystem : StartSystem<Session>
-	{
-		public override void Start(Session self)
-		{
-			self.Start();
-		}
-	}
-
 	public sealed class Session : Entity
 	{
 		private static int RpcId { get; set; }
@@ -50,11 +41,6 @@ namespace ETModel
 			this.requestCallback.Clear();
 		}
 
-		public void Start()
-		{
-			this.StartRecv();
-		}
-
 		public override void Dispose()
 		{
 			if (this.IsDisposed)
@@ -93,41 +79,15 @@ namespace ETModel
 			}
 		}
 
-		private async void StartRecv()
+		public void OnRead(Packet packet)
 		{
-			long instanceId = this.InstanceId;
-			
-			while (true)
+			try
 			{
-				if (this.InstanceId != instanceId)
-				{
-					return;
-				}
-
-				Packet packet;
-				try
-				{
-					packet = await this.channel.Recv();
-
-					if (this.InstanceId != instanceId)
-					{
-						return;
-					}
-				}
-				catch (Exception e)
-				{
-					Log.Error(e);
-					return;
-				}
-				
-				try
-				{
-					this.Run(packet);
-				}
-				catch (Exception e)
-				{
-					Log.Error(e);
-				}
+				this.Run(packet);
+			}
+			catch (Exception e)
+			{
+				Log.Error(e);
 			}
 		}