Просмотр исходного кода

抽象出IKcpTransport接口,底层可以用UDP TCP跟Websocket

tanghai 2 лет назад
Родитель
Сommit
3b265aaa29

+ 145 - 0
Unity/Assets/Scripts/Core/Network/IKcpTransport.cs

@@ -0,0 +1,145 @@
+using System;
+using System.Net;
+using System.Net.Sockets;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+
+namespace ET
+{
+    public interface IKcpTransport: IDisposable
+    {
+        void Send(byte[] bytes, int index, int length, EndPoint endPoint);
+        int Recv(byte[] buffer, ref EndPoint endPoint);
+        int Available();
+        void Update();
+    }
+
+    public class UdpTransport: IKcpTransport
+    {
+        private readonly Socket socket;
+
+        public UdpTransport(AddressFamily addressFamily)
+        {
+            this.socket = new Socket(addressFamily, SocketType.Dgram, ProtocolType.Udp);
+            NetworkHelper.SetSioUdpConnReset(this.socket);
+        }
+        
+        public UdpTransport(IPEndPoint ipEndPoint)
+        {
+            this.socket = new Socket(ipEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
+            if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
+            {
+                this.socket.SendBufferSize = Kcp.OneM * 64;
+                this.socket.ReceiveBufferSize = Kcp.OneM * 64;
+            }
+            
+            try
+            {
+                this.socket.Bind(ipEndPoint);
+            }
+            catch (Exception e)
+            {
+                throw new Exception($"bind error: {ipEndPoint}", e);
+            }
+
+            NetworkHelper.SetSioUdpConnReset(this.socket);
+        }
+        
+        public void Send(byte[] bytes, int index, int length, EndPoint endPoint)
+        {
+            this.socket.SendTo(bytes, index, length, SocketFlags.None, endPoint);
+        }
+
+        public int Recv(byte[] buffer, ref EndPoint endPoint)
+        {
+            return this.socket.ReceiveFrom_NonAlloc(buffer, ref endPoint);
+        }
+
+        public int Available()
+        {
+            return this.socket.Available;
+        }
+
+        public void Update()
+        {
+        }
+
+        public void Dispose()
+        {
+            this.socket?.Dispose();
+        }
+    }
+
+    public class TcpTransport: IKcpTransport
+    {
+        private readonly TService tService;
+
+        private readonly DoubleMap<long, EndPoint> idEndpoints = new();
+
+        private readonly Queue<(long, MemoryBuffer)> channelRecvDatas = new();
+        
+        public TcpTransport(AddressFamily addressFamily)
+        {
+            this.tService = new TService(addressFamily, ServiceType.Outer);
+        }
+        
+        public TcpTransport(IPEndPoint ipEndPoint)
+        {
+            this.tService = new TService(ipEndPoint, ServiceType.Outer);
+            this.tService.AcceptCallback += this.OnAccept;
+            this.tService.ErrorCallback += this.OnError;
+            this.tService.ReadCallback += this.OnRead;
+        }
+
+        private void OnAccept(long id, IPEndPoint ipEndPoint)
+        {
+            TChannel channel = this.tService.Get(id);
+            this.idEndpoints.Add(id, channel.RemoteAddress);
+        }
+
+        private void OnError(long id, int error)
+        {
+            this.idEndpoints.RemoveByKey(id);
+        }
+        
+        private void OnRead(long id, MemoryBuffer memoryBuffer)
+        {
+            channelRecvDatas.Enqueue((id, memoryBuffer));
+        }
+        
+        public void Send(byte[] bytes, int index, int length, EndPoint endPoint)
+        {
+            long channelId = this.idEndpoints.GetKeyByValue(endPoint);
+            this.tService.Send(channelId, new MemoryBuffer(bytes, index, length));
+        }
+
+        public int Recv(byte[] buffer, ref EndPoint endPoint)
+        {
+            (long channelId, MemoryBuffer memoryBuffer) = this.channelRecvDatas.Dequeue();
+            TChannel channel = this.tService.Get(channelId);
+            if (channel == null)
+            {
+                return 0;
+            }
+            endPoint = channel.RemoteAddress;
+            int count = memoryBuffer.Read(buffer);
+            memoryBuffer.Dispose();
+            return count;
+        }
+
+        public int Available()
+        {
+            return this.channelRecvDatas.Count;
+        }
+
+        public void Update()
+        {
+            this.tService.Update();
+        }
+
+        public void Dispose()
+        {
+            this.tService?.Dispose();
+        }
+    }
+}

+ 11 - 0
Unity/Assets/Scripts/Core/Network/IKcpTransport.cs.meta

@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: c8f80bcf0bdf19f4c98ddad194e155a4
+MonoImporter:
+  externalObjects: {}
+  serializedVersion: 2
+  defaultReferences: []
+  executionOrder: 0
+  icon: {instanceID: 0}
+  userData: 
+  assetBundleName: 
+  assetBundleVariant: 

+ 2 - 2
Unity/Assets/Scripts/Core/Network/KChannel.cs

@@ -200,7 +200,7 @@ namespace ET
 				buffer.WriteTo(0, KcpProtocalType.SYN);
 				buffer.WriteTo(1, this.LocalConn);
 				buffer.WriteTo(5, this.RemoteConn);
-				this.Service.Socket.SendTo(buffer, 0, 9, SocketFlags.None, this.RemoteAddress);
+				this.Service.Socket.Send(buffer, 0, 9, this.RemoteAddress);
 				// 这里很奇怪 调用socket.LocalEndPoint会动到this.RemoteAddressNonAlloc里面的temp,这里就不仔细研究了
 				Log.Info($"kchannel connect {this.LocalConn} {this.RemoteConn} {this.RealAddress}");
 
@@ -374,7 +374,7 @@ namespace ET
 				bytes.WriteTo(0, KcpProtocalType.MSG);
 				// 每个消息头部写下该channel的id;
 				bytes.WriteTo(1, this.LocalConn);
-				this.Service.Socket.SendTo(bytes, 0, count + 5, SocketFlags.None, this.RemoteAddress);
+				this.Service.Socket.Send(bytes, 0, count + 5, this.RemoteAddress);
 			}
 			catch (Exception e)
 			{

+ 14 - 33
Unity/Assets/Scripts/Core/Network/KService.cs

@@ -44,44 +44,25 @@ namespace ET
             }
         }
 
-        public Socket Socket;
+        public IKcpTransport Socket;
 
         public KService(IPEndPoint ipEndPoint, ServiceType serviceType)
         {
             this.ServiceType = serviceType;
             this.startTime = DateTime.UtcNow.Ticks;
-            this.Socket = new Socket(ipEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
-            if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
-            {
-                this.Socket.SendBufferSize = Kcp.OneM * 64;
-                this.Socket.ReceiveBufferSize = Kcp.OneM * 64;
-            }
-            
-
-            try
-            {
-                this.Socket.Bind(ipEndPoint);
-            }
-            catch (Exception e)
-            {
-                throw new Exception($"bind error: {ipEndPoint}", e);
-            }
-
-            NetworkHelper.SetSioUdpConnReset(this.Socket);
+            this.Socket = new UdpTransport(ipEndPoint);
         }
 
         public KService(AddressFamily addressFamily, ServiceType serviceType)
         {
             this.ServiceType = serviceType;
             this.startTime = DateTime.UtcNow.Ticks;
-            this.Socket = new Socket(addressFamily, SocketType.Dgram, ProtocolType.Udp);
-
-            NetworkHelper.SetSioUdpConnReset(this.Socket);
+            this.Socket = new UdpTransport(addressFamily);
         }
 
         // 保存所有的channel
-        private readonly Dictionary<long, KChannel> localConnChannels = new Dictionary<long, KChannel>();
-        private readonly Dictionary<long, KChannel> waitAcceptChannels = new Dictionary<long, KChannel>();
+        private readonly Dictionary<long, KChannel> localConnChannels = new();
+        private readonly Dictionary<long, KChannel> waitAcceptChannels = new();
 
         private readonly byte[] cache = new byte[2048];
         
@@ -90,14 +71,14 @@ namespace ET
 
         
 
-        private readonly List<long> cacheIds = new List<long>();
+        private readonly List<long> cacheIds = new();
         
 
         // 下帧要更新的channel
-        private readonly HashSet<long> updateIds = new HashSet<long>();
+        private readonly HashSet<long> updateIds = new();
         // 下次时间更新的channel
         private readonly NativeCollection.MultiMap<long, long> timeId = new();
-        private readonly List<long> timeOutTime = new List<long>();
+        private readonly List<long> timeOutTime = new();
         // 记录最小时间,不用每次都去MultiMap取第一个值
         private long minTime;
 
@@ -126,7 +107,7 @@ namespace ET
                 this.Remove(channelId);
             }
 
-            this.Socket.Close();
+            this.Socket.Dispose();
             this.Socket = null;
         }
 
@@ -157,9 +138,9 @@ namespace ET
                 return;
             }
 
-            while (this.Socket != null && this.Socket.Available > 0)
+            while (this.Socket != null && this.Socket.Available() > 0)
             {
-                int messageLength = this.Socket.ReceiveFrom_NonAlloc(this.cache, ref this.ipEndPoint);
+                int messageLength = this.Socket.Recv(this.cache, ref this.ipEndPoint);
 
                 // 长度小于1,不是正常的消息
                 if (messageLength < 1)
@@ -225,7 +206,7 @@ namespace ET
                                 buffer.WriteTo(1, kChannel.LocalConn);
                                 buffer.WriteTo(5, kChannel.RemoteConn);
                                 buffer.WriteTo(9, connectId);
-                                this.Socket.SendTo(buffer, 0, 13, SocketFlags.None, this.ipEndPoint);
+                                this.Socket.Send(buffer, 0, 13, this.ipEndPoint);
                             }
                             catch (Exception e)
                             {
@@ -293,7 +274,7 @@ namespace ET
                                 buffer.WriteTo(5, kChannel.RemoteConn);
                                 Log.Info($"kservice syn: {kChannel.Id} {remoteConn} {localConn}");
                                 
-                                this.Socket.SendTo(buffer, 0, 9, SocketFlags.None, kChannel.RemoteAddress);
+                                this.Socket.Send(buffer, 0, 9, kChannel.RemoteAddress);
                             }
                             catch (Exception e)
                             {
@@ -457,7 +438,7 @@ namespace ET
                 buffer.WriteTo(9, (uint) error);
                 for (int i = 0; i < times; ++i)
                 {
-                    this.Socket.SendTo(buffer, 0, 13, SocketFlags.None, address);
+                    this.Socket.Send(buffer, 0, 13, address);
                 }
             }
             catch (Exception e)

+ 1 - 1
Unity/Assets/Scripts/Core/Network/TService.cs

@@ -124,7 +124,7 @@ namespace ET
 			this.idChannels.Add(channel.Id, channel);
 		}
 		
-		private TChannel Get(long id)
+		public TChannel Get(long id)
 		{
 			TChannel channel = null;
 			this.idChannels.TryGetValue(id, out channel);