Bladeren bron

TcpTransport调通

tanghai 2 jaren geleden
bovenliggende
commit
97f39cf1eb

+ 19 - 5
Unity/Assets/Scripts/Core/Network/IKcpTransport.cs

@@ -2,6 +2,7 @@
 using System.Net;
 using System.Net.Sockets;
 using System.Collections.Generic;
+using System.IO;
 using System.Runtime.InteropServices;
 
 namespace ET
@@ -87,14 +88,16 @@ namespace ET
         public TcpTransport(AddressFamily addressFamily)
         {
             this.tService = new TService(addressFamily, ServiceType.Outer);
+            this.tService.ErrorCallback = this.OnError;
+            this.tService.ReadCallback = this.OnRead;
         }
         
         public TcpTransport(IPEndPoint ipEndPoint)
         {
             this.tService = new TService(ipEndPoint, ServiceType.Outer);
-            this.tService.AcceptCallback += this.OnAccept;
-            this.tService.ErrorCallback += this.OnError;
-            this.tService.ReadCallback += this.OnRead;
+            this.tService.AcceptCallback = this.OnAccept;
+            this.tService.ErrorCallback = this.OnError;
+            this.tService.ReadCallback = this.OnRead;
         }
 
         private void OnAccept(long id, IPEndPoint ipEndPoint)
@@ -105,6 +108,7 @@ namespace ET
 
         private void OnError(long id, int error)
         {
+            Log.Error($"IKcpTransport error: {error}");
             this.idEndpoints.RemoveByKey(id);
         }
         
@@ -116,7 +120,16 @@ namespace ET
         public void Send(byte[] bytes, int index, int length, EndPoint endPoint)
         {
             long channelId = this.idEndpoints.GetKeyByValue(endPoint);
-            this.tService.Send(channelId, new MemoryBuffer(bytes, index, length));
+            if (channelId == 0)
+            {
+                channelId = IdGenerater.Instance.GenerateInstanceId();
+                this.tService.Create(channelId, endPoint.ToString());
+                this.idEndpoints.Add(channelId, endPoint);
+            }
+            MemoryBuffer memoryBuffer = this.tService.Fetch();
+            memoryBuffer.Write(bytes, index, length);
+            memoryBuffer.Seek(0, SeekOrigin.Begin);
+            this.tService.Send(channelId, memoryBuffer);
         }
 
         public int Recv(byte[] buffer, ref EndPoint endPoint)
@@ -132,9 +145,10 @@ namespace ET
             {
                 return 0;
             }
+            
             endPoint = channel.RemoteAddress;
             int count = memoryBuffer.Read(buffer);
-            memoryBuffer.Dispose();
+            this.tService.Recycle(memoryBuffer);
             return count;
         }
 

+ 0 - 1
Unity/Assets/Scripts/Core/Network/KChannel.cs

@@ -347,7 +347,6 @@ namespace ET
 				MemoryBuffer memoryBuffer = this.readMemory;
 				this.readMemory = null;
 				this.OnRead(memoryBuffer);
-				this.Service.Recycle(memoryBuffer);
 			}
 		}
 

+ 4 - 3
Unity/Assets/Scripts/Core/Network/KService.cs

@@ -50,14 +50,14 @@ namespace ET
         {
             this.ServiceType = serviceType;
             this.startTime = DateTime.UtcNow.Ticks;
-            this.Socket = new UdpTransport(ipEndPoint);
+            this.Socket = new TcpTransport(ipEndPoint);
         }
 
         public KService(AddressFamily addressFamily, ServiceType serviceType)
         {
             this.ServiceType = serviceType;
             this.startTime = DateTime.UtcNow.Ticks;
-            this.Socket = new UdpTransport(addressFamily);
+            this.Socket = new TcpTransport(addressFamily);
         }
 
         // 保存所有的channel
@@ -141,7 +141,6 @@ namespace ET
             while (this.Socket != null && this.Socket.Available() > 0)
             {
                 int messageLength = this.Socket.RecvNonAlloc(this.cache, ref this.ipEndPoint);
-
                 // 长度小于1,不是正常的消息
                 if (messageLength < 1)
                 {
@@ -471,6 +470,8 @@ namespace ET
             this.Recv();
 
             this.UpdateChannel(timeNow);
+            
+            this.Socket.Update();
         }
 
         private void CheckWaitAcceptChannel(uint timeNow)

+ 2 - 2
Unity/Assets/Scripts/Core/Network/PacketParser.cs

@@ -84,11 +84,11 @@ namespace ET
 
 						if (this.service.ServiceType == ServiceType.Inner)
 						{
-							memoryBuffer.Seek(Packet.MessageIndex, SeekOrigin.Begin);
+							memoryBuffer.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
 						}
 						else
 						{
-							memoryBuffer.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
+							memoryBuffer.Seek(0, SeekOrigin.Begin);
 						}
 
 						this.state = ParserState.PacketSize;

+ 27 - 4
Unity/Assets/Scripts/Core/Network/TChannel.cs

@@ -99,11 +99,33 @@ namespace ET
 				throw new Exception("TChannel已经被Dispose, 不能发送消息");
 			}
 			
+			switch (this.Service.ServiceType)
+			{
+				case ServiceType.Inner:
+				{
+					int messageSize = (int) (stream.Length - stream.Position);
+					if (messageSize > ushort.MaxValue * 16)
+					{
+						throw new Exception($"send packet too large: {stream.Length} {stream.Position}");
+					}
+
+					this.sendCache.WriteTo(0, messageSize);
+					this.sendBuffer.Write(this.sendCache, 0, PacketParser.InnerPacketSizeLength);
+					break;
+				}
+				case ServiceType.Outer:
+				{
+					ushort messageSize = (ushort) (stream.Length - stream.Position);
+					this.sendCache.WriteTo(0, messageSize);
+					this.sendBuffer.Write(this.sendCache, 0, PacketParser.OuterPacketSizeLength);
+					break;
+				}
+			}
+			
 			this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position, (int)(stream.Length - stream.Position));
 			
 			if (!this.isSending)
 			{
-				//this.StartSend();
 				this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
 			}
 			
@@ -221,6 +243,10 @@ namespace ET
 				}
 				try
 				{
+					if (this.recvBuffer.Length == 0)
+					{
+						break;
+					}
 					bool ret = this.parser.Parse(out MemoryBuffer memoryBuffer);
 					if (!ret)
 					{
@@ -228,8 +254,6 @@ namespace ET
 					}
 					
 					this.OnRead(memoryBuffer);
-					
-					this.Service.Recycle(memoryBuffer);
 				}
 				catch (Exception ee)
 				{
@@ -276,7 +300,6 @@ namespace ET
 					{
 						sendSize = (int)this.sendBuffer.Length;
 					}
-					
 					this.outArgs.SetBuffer(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
 					
 					if (this.socket.SendAsync(this.outArgs))

+ 0 - 1
Unity/Assets/Scripts/Core/Network/TService.cs

@@ -186,7 +186,6 @@ namespace ET
 				}
 				
 				SocketAsyncEventArgs e = result.SocketAsyncEventArgs;
-
 				if (e == null)
 				{
 					switch (result.Op)

+ 0 - 1
Unity/Assets/Scripts/Core/Network/WChannel.cs

@@ -199,7 +199,6 @@ namespace ET
                     memoryBuffer.Seek(2, SeekOrigin.Begin);
                     Array.Copy(this.cache, 0, memoryBuffer.GetBuffer(), 0, receiveCount);
                     this.OnRead(memoryBuffer);
-                    this.Service.Recycle(memoryBuffer);
                 }
             }
             catch (Exception e)

+ 36 - 13
Unity/Assets/Scripts/Hotfix/Server/Module/Router/RouterComponentSystem.cs

@@ -13,14 +13,15 @@ namespace ET.Server
         [EntitySystem]
         private static void Awake(this RouterComponent self, IPEndPoint outerAddress, string innerIP)
         {
-            self.OuterSocket = new UdpTransport(outerAddress);
-            self.InnerSocket = new UdpTransport(new IPEndPoint(IPAddress.Parse(innerIP), 0));
+            self.OuterUdp = new UdpTransport(outerAddress);
+            self.OuterTcp = new TcpTransport(outerAddress);
+            self.InnerSocket = new TcpTransport(new IPEndPoint(IPAddress.Parse(innerIP), 0));
         }
         
         [EntitySystem]
         private static void Destroy(this RouterComponent self)
         {
-            self.OuterSocket.Dispose();
+            self.OuterUdp.Dispose();
             self.InnerSocket.Dispose();
             self.OuterNodes.Clear();
             self.IPEndPoint = null;
@@ -29,8 +30,12 @@ namespace ET.Server
         [EntitySystem]
         private static void Update(this RouterComponent self)
         {
+            self.OuterUdp.Update();
+            self.OuterTcp.Update();
+            self.InnerSocket.Update();
             long timeNow = TimeInfo.Instance.ClientNow();
-            self.RecvOuter(timeNow);
+            self.RecvOuterUdp(timeNow);
+            self.RecvOuterTcp(timeNow);
             self.RecvInner(timeNow);
 
             // 每秒钟检查一次
@@ -46,15 +51,32 @@ namespace ET.Server
             IPEndPoint ipEndPoint = (IPEndPoint) self.IPEndPoint;
             return new IPEndPoint(ipEndPoint.Address, ipEndPoint.Port);
         }
+        
+        // 接收tcp消息
+        private static void RecvOuterTcp(this RouterComponent self, long timeNow)
+        {
+            while (self.OuterTcp != null && self.OuterTcp.Available() > 0)
+            {
+                try
+                {
+                    int messageLength = self.OuterTcp.Recv(self.Cache, ref self.IPEndPoint);
+                    self.RecvOuterHandler(messageLength, timeNow);
+                }
+                catch (Exception e)
+                {
+                    Log.Error(e);
+                }
+            }
+        }
 
         // 接收udp消息
-        private static void RecvOuter(this RouterComponent self, long timeNow)
+        private static void RecvOuterUdp(this RouterComponent self, long timeNow)
         {
-            while (self.OuterSocket != null && self.OuterSocket.Available() > 0)
+            while (self.OuterUdp != null && self.OuterUdp.Available() > 0)
             {
                 try
                 {
-                    int messageLength = self.OuterSocket.Recv(self.Cache, ref self.IPEndPoint);
+                    int messageLength = self.OuterUdp.Recv(self.Cache, ref self.IPEndPoint);
                     self.RecvOuterHandler(messageLength, timeNow);
                 }
                 catch (Exception e)
@@ -113,6 +135,7 @@ namespace ET.Server
                 try
                 {
                     int messageLength = self.InnerSocket.Recv(self.Cache, ref self.IPEndPoint);
+                    
                     self.RecvInnerHandler(messageLength, timeNow);
                 }
                 catch (Exception e)
@@ -260,7 +283,7 @@ namespace ET.Server
                     self.Cache.WriteTo(0, KcpProtocalType.RouterACK);
                     self.Cache.WriteTo(1, routerNode.InnerConn);
                     self.Cache.WriteTo(5, routerNode.OuterConn);
-                    self.OuterSocket.Send(self.Cache, 0, 9, routerNode.SyncIpEndPoint);
+                    self.OuterUdp.Send(self.Cache, 0, 9, routerNode.SyncIpEndPoint);
 
                     if (!routerNode.CheckOuterCount(timeNow))
                     {
@@ -279,7 +302,7 @@ namespace ET.Server
 
                     uint outerConn = BitConverter.ToUInt32(self.Cache, 1); // remote
                     uint innerConn = BitConverter.ToUInt32(self.Cache, 5);
-
+                    
                     if (!self.OuterNodes.TryGetValue(outerConn, out RouterNode kcpRouter))
                     {
                         Log.Warning($"kcp router syn not found outer nodes: {outerConn} {innerConn}");
@@ -462,7 +485,7 @@ namespace ET.Server
                     self.Cache.WriteTo(1, kcpRouterNode.InnerConn);
                     self.Cache.WriteTo(5, kcpRouterNode.OuterConn);
                     Log.Info($"kcp router RouterAck: {outerConn} {innerConn} {kcpRouterNode.SyncIpEndPoint}");
-                    self.OuterSocket.Send(self.Cache, 0, 9, kcpRouterNode.SyncIpEndPoint);
+                    self.OuterTcp.Send(self.Cache, 0, 9, kcpRouterNode.SyncIpEndPoint);
                     break;
                 }
 
@@ -484,7 +507,7 @@ namespace ET.Server
                     kcpRouterNode.LastRecvInnerTime = timeNow;
                     // 转发出去
                     Log.Info($"kcp router ack: {outerConn} {innerConn} {kcpRouterNode.OuterIpEndPoint}");
-                    self.OuterSocket.Send(self.Cache, 0, messageLength, kcpRouterNode.OuterIpEndPoint);
+                    self.OuterTcp.Send(self.Cache, 0, messageLength, kcpRouterNode.OuterIpEndPoint);
                     break;
                 }
                 case KcpProtocalType.FIN: // 断开
@@ -519,7 +542,7 @@ namespace ET.Server
 
                     kcpRouterNode.LastRecvInnerTime = timeNow;
                     Log.Info($"kcp router inner fin: {outerConn} {innerConn} {kcpRouterNode.OuterIpEndPoint}");
-                    self.OuterSocket.Send(self.Cache, 0, messageLength, kcpRouterNode.OuterIpEndPoint);
+                    self.OuterTcp.Send(self.Cache, 0, messageLength, kcpRouterNode.OuterIpEndPoint);
 
                     break;
                 }
@@ -555,7 +578,7 @@ namespace ET.Server
                     }
 
                     kcpRouterNode.LastRecvInnerTime = timeNow;
-                    self.OuterSocket.Send(self.Cache, 0, messageLength, kcpRouterNode.OuterIpEndPoint);
+                    self.OuterTcp.Send(self.Cache, 0, messageLength, kcpRouterNode.OuterIpEndPoint);
                     break;
                 }
             }

+ 2 - 1
Unity/Assets/Scripts/Model/Server/Module/Router/RouterComponent.cs

@@ -9,7 +9,8 @@ namespace ET.Server
     [ComponentOf(typeof(Scene))]
     public class RouterComponent: Entity, IAwake<IPEndPoint, string>, IDestroy, IUpdate
     {
-        public IKcpTransport OuterSocket;
+        public IKcpTransport OuterUdp;
+        public IKcpTransport OuterTcp;
         public IKcpTransport InnerSocket;
         public EndPoint IPEndPoint = new IPEndPoint(IPAddress.Any, 0);