Преглед изворни кода

初步实现websocket服务端,测试通过

tanghai пре 7 година
родитељ
комит
da3d635da5

+ 7 - 3
Server/App/Program.cs

@@ -1,4 +1,5 @@
 using System;
+using System.Collections.Generic;
 using System.Net;
 using System.Threading;
 using ETModel;
@@ -47,8 +48,9 @@ namespace App
 				switch (startConfig.AppType)
 				{
 					case AppType.Manager:
-						Game.Scene.AddComponent<NetInnerComponent, IPEndPoint>(innerConfig.IPEndPoint);
-						Game.Scene.AddComponent<NetOuterComponent, IPEndPoint>(outerConfig.IPEndPoint);
+						//Game.Scene.AddComponent<NetInnerComponent, IPEndPoint>(innerConfig.IPEndPoint);
+						//Game.Scene.AddComponent<NetOuterComponent, IPEndPoint>(outerConfig.IPEndPoint);
+						Game.Scene.AddComponent<NetOuterComponent, List<string>>(new List<string>() {"http://*:8080/"});
 						Game.Scene.AddComponent<AppManagerComponent>();
 						break;
 					case AppType.Realm:
@@ -99,8 +101,10 @@ namespace App
 						// Game.Scene.AddComponent<HttpComponent>();
 						break;
 					case AppType.Benchmark:
+						//Game.Scene.AddComponent<NetOuterComponent>();
+						//Game.Scene.AddComponent<BenchmarkComponent, IPEndPoint>(clientConfig.IPEndPoint);
 						Game.Scene.AddComponent<NetOuterComponent>();
-						Game.Scene.AddComponent<BenchmarkComponent, IPEndPoint>(clientConfig.IPEndPoint);
+						Game.Scene.AddComponent<WebSocketBenchmarkComponent>();
 						break;
 					default:
 						throw new Exception($"命令行参数没有设置正确的AppType: {startConfig.AppType}");

+ 1 - 1
Server/Hotfix/Module/Benchmark/BenchmarkComponentSystem.cs

@@ -14,7 +14,7 @@ namespace ETHotfix
 		}
 	}
 
-	public static class BenchmarkComponentEx
+	public static class BenchmarkComponentHelper
 	{
 		public static void Awake(this BenchmarkComponent self, IPEndPoint ipEndPoint)
 		{

+ 81 - 0
Server/Hotfix/Module/Benchmark/WebSocketBenchmarkComponentSystem.cs

@@ -0,0 +1,81 @@
+using System;
+using System.Threading.Tasks;
+using ETModel;
+
+namespace ETHotfix
+{
+	[ObjectSystem]
+	public class WebSocketBenchmarkComponentSystem : AwakeSystem<WebSocketBenchmarkComponent>
+	{
+		public override void Awake(WebSocketBenchmarkComponent self)
+		{
+			self.Awake();
+		}
+	}
+
+	public static class WebSocketBenchmarkComponentHelper
+	{
+		public static void Awake(this WebSocketBenchmarkComponent self)
+		{
+			try
+			{
+				NetOuterComponent networkComponent = Game.Scene.GetComponent<NetOuterComponent>();
+				for (int i = 0; i < 1000; i++)
+				{
+					self.TestAsync(networkComponent, i);
+				}
+			}
+			catch (Exception e)
+			{
+				Log.Error(e);
+			}
+		}
+		
+		public static async void TestAsync(this WebSocketBenchmarkComponent self, NetOuterComponent networkComponent, int j)
+		{
+			try
+			{
+				using (Session session = networkComponent.Create($"ws://127.0.0.1:8080"))
+				{
+					int i = 0;
+					while (i < 100000000)
+					{
+						++i;
+						await self.Send(session, j);
+					}
+				}
+			}
+			catch (RpcException e)
+			{
+				Log.Error(e);
+			}
+			catch (Exception e)
+			{
+				Log.Error(e);
+			}
+		}
+		
+		public static async Task Send(this WebSocketBenchmarkComponent self, Session session, int j)
+		{
+			try
+			{
+				await session.Call(new C2R_Ping());
+				++self.k;
+
+				if (self.k % 10000 != 0)
+				{
+					return;
+				}
+
+				long time2 = TimeHelper.ClientNow();
+				long time = time2 - self.time1;
+				self.time1 = time2;
+				Log.Info($"Benchmark k: {self.k} 每1W次耗时: {time} ms {session.Network.Count}");
+			}
+			catch (Exception e)
+			{
+				Log.Error(e);
+			}
+		}
+	}
+}

+ 14 - 2
Server/Hotfix/Module/Message/NetOuterComponentSystem.cs

@@ -1,4 +1,5 @@
-using System.Net;
+using System.Collections.Generic;
+using System.Net;
 using ETModel;
 
 namespace ETHotfix
@@ -8,7 +9,7 @@ namespace ETHotfix
 	{
 		public override void Awake(NetOuterComponent self)
 		{
-			self.Awake(NetworkProtocol.TCP);
+			self.Awake(NetworkProtocol.WebSocket);
 			self.MessagePacker = new ProtobufPacker();
 			self.MessageDispatcher = new OuterMessageDispatcher();
 		}
@@ -24,6 +25,17 @@ namespace ETHotfix
 			self.MessageDispatcher = new OuterMessageDispatcher();
 		}
 	}
+	
+	[ObjectSystem]
+	public class NetOuterComponentAwake2System : AwakeSystem<NetOuterComponent, List<string>>
+	{
+		public override void Awake(NetOuterComponent self, List<string> prefixs)
+		{
+			self.Awake(NetworkProtocol.WebSocket, prefixs);
+			self.MessagePacker = new ProtobufPacker();
+			self.MessageDispatcher = new OuterMessageDispatcher();
+		}
+	}
 
 	[ObjectSystem]
 	public class NetOuterComponentUpdateSystem : UpdateSystem<NetOuterComponent>

+ 13 - 0
Server/Model/Module/Benchmark/WebSocketBenchmarkComponent.cs

@@ -0,0 +1,13 @@
+using System;
+using System.Net;
+using System.Threading.Tasks;
+
+namespace ETModel
+{
+	public class WebSocketBenchmarkComponent: Component
+	{
+		public int k;
+
+		public long time1 = TimeHelper.ClientNow();
+	}
+}

+ 186 - 0
Server/Model/Module/Message/Network/WebSocket/WChannel.cs

@@ -0,0 +1,186 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.WebSockets;
+using System.Threading;
+
+namespace ETModel
+{
+    public class WChannel: AChannel
+    {
+        private readonly HttpListenerWebSocketContext webSocketContext;
+
+        private readonly WebSocket webSocket;
+
+		private readonly Queue<byte[]> queue = new Queue<byte[]>();
+
+        private bool isSending;
+
+        private bool isConnected;
+
+        private readonly MemoryStream memoryStream;
+        
+        public WChannel(HttpListenerWebSocketContext webSocketContext, AService service): base(service, ChannelType.Accept)
+        {
+            this.InstanceId = IdGenerater.GenerateId();
+            
+            this.webSocketContext = webSocketContext;
+
+            this.webSocket = webSocketContext.WebSocket;
+            
+            this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
+
+            isConnected = true;
+        }
+        
+        public WChannel(WebSocket webSocket, AService service): base(service, ChannelType.Connect)
+        {
+            this.InstanceId = IdGenerater.GenerateId();
+
+            this.webSocket = webSocket;
+            
+            this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
+        }
+
+        public override void Dispose()
+        {
+            if (this.IsDisposed)
+            {
+                return;
+            }
+            base.Dispose();
+            
+            this.webSocket.Dispose();
+            
+            this.memoryStream.Dispose();
+        }
+
+        public override MemoryStream Stream
+        {
+            get
+            {
+                return this.memoryStream;
+            }
+        }
+        
+        public override void Start()
+        {
+            if (!this.isConnected)
+            {
+                return;
+            }
+            this.StartRecv();
+            this.StartSend();
+        }
+        
+        private WService GetService()
+        {
+            return (WService)this.service;
+        }
+
+        public async void ConnectAsync(string url)
+        {
+            try
+            {
+                await ((ClientWebSocket)this.webSocket).ConnectAsync(new Uri(url), new CancellationToken());
+                isConnected = true;
+                this.Start();
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+                this.OnError(ErrorCode.ERR_WebsocketConnectError);
+            }
+        }
+
+        public override void Send(MemoryStream stream)
+        {
+            byte[] bytes = new byte[stream.Length];
+            Array.Copy(stream.GetBuffer(), bytes, bytes.Length);
+            this.queue.Enqueue(bytes);
+
+            if (this.isConnected)
+            {
+                this.StartSend();
+            }
+        }
+
+        public async void StartSend()
+        {
+            try
+            {
+                if (this.isSending)
+                {
+                    return;
+                }
+            
+                this.isSending = true;
+            
+                while (true)
+                {
+                    if (this.queue.Count == 0)
+                    {
+                        this.isSending = false;
+                        return;
+                    }
+
+                    byte[] bytes = this.queue.Dequeue();
+                    try
+                    {
+                        await this.webSocket.SendAsync(new ReadOnlyMemory<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Binary, true, new CancellationToken());
+                    }
+                    catch (Exception e)
+                    {
+                        Log.Error(e);
+                        this.OnError(ErrorCode.ERR_WebsocketSendError);
+                        return;
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+            }
+        }
+
+        public async void StartRecv()
+        {
+            try
+            {
+                while (true)
+                {
+                    ValueWebSocketReceiveResult receiveResult;
+                    try
+                    {
+                        receiveResult = await this.webSocket.ReceiveAsync(new Memory<byte>(this.Stream.GetBuffer(), 0, this.Stream.Capacity), new CancellationToken());
+                        if (receiveResult.MessageType == WebSocketMessageType.Close)
+                        {
+                            this.OnError(ErrorCode.ERR_WebsocketPeerReset);
+                            return;
+                        }
+
+                        if (receiveResult.Count > ushort.MaxValue)
+                        {
+                            await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveResult.Count}", new CancellationToken());
+                            this.OnError(ErrorCode.ERR_WebsocketMessageTooBig);
+                            return;
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        Log.Error(e);
+                        this.OnError(ErrorCode.ERR_WebsocketRecvError);
+                        return;
+                    }
+                
+                    this.Stream.SetLength(receiveResult.Count);
+                    this.OnRead(this.Stream);
+                }
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+            }
+        }
+    }
+}

+ 101 - 0
Server/Model/Module/Message/Network/WebSocket/WService.cs

@@ -0,0 +1,101 @@
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Net.WebSockets;
+using Microsoft.IO;
+
+namespace ETModel
+{
+    public class WService: AService
+    {
+        private readonly HttpListener httpListener;
+        
+        private readonly Dictionary<long, WChannel> channels = new Dictionary<long, WChannel>();
+        
+        public RecyclableMemoryStreamManager MemoryStreamManager = new RecyclableMemoryStreamManager();
+
+        public WService(IEnumerable<string> prefixs)
+        {
+            this.InstanceId = IdGenerater.GenerateId();
+            
+            this.httpListener = new HttpListener();
+            
+            foreach (string prefix in prefixs)
+            {
+                this.httpListener.Prefixes.Add(prefix);
+            }
+        }
+        
+        public WService()
+        {
+        }
+        
+        public override AChannel GetChannel(long id)
+        {
+            WChannel channel;
+            this.channels.TryGetValue(id, out channel);
+            return channel;
+        }
+
+        public override AChannel ConnectChannel(IPEndPoint ipEndPoint)
+        {
+            throw new NotImplementedException();
+        }
+
+        public override AChannel ConnectChannel(string address)
+        {
+			ClientWebSocket webSocket = new ClientWebSocket();
+            WChannel channel = new WChannel(webSocket, this);
+            this.channels[channel.Id] = channel;
+            channel.ConnectAsync(address);
+            return channel;
+        }
+
+        public override void Remove(long id)
+        {
+            WChannel channel;
+            if (!this.channels.TryGetValue(id, out channel))
+            {
+                return;
+            }
+
+            this.channels.Remove(id);
+            channel.Dispose();
+        }
+
+        public override void Update()
+        {
+            
+        }
+
+        public override async void Start()
+        {
+            if (this.httpListener == null)
+            {
+                return;
+            }
+            
+            httpListener.Start();
+
+            while (true)
+            {
+                try
+                {
+                    HttpListenerContext httpListenerContext = await this.httpListener.GetContextAsync();
+                
+                    HttpListenerWebSocketContext webSocketContext = await httpListenerContext.AcceptWebSocketAsync(null);
+                    
+                    WChannel channel = new WChannel(webSocketContext, this);
+                
+                    this.channels[channel.Id] = channel;
+                    
+                    this.OnAccept(channel);
+                }
+                catch (Exception e)
+                {
+                    Log.Error(e);
+                }
+            }
+        }
+    }
+}

+ 7 - 0
Unity/Assets/Scripts/Module/Message/ErrorCode.cs

@@ -34,6 +34,13 @@ namespace ETModel
 		public const int ERR_SocketCantSend = 202009;
 		public const int ERR_SocketError = 202010;
 
+		public const int ERR_WebsocketPeerReset = 203001;
+		public const int ERR_WebsocketMessageTooBig = 203002;
+		public const int ERR_WebsocketError = 203003;
+		public const int ERR_WebsocketConnectError = 203004;
+		public const int ERR_WebsocketSendError = 203005;
+		public const int ERR_WebsocketRecvError = 203006;
+
 		public static bool IsRpcNeedThrowException(int error)
 		{
 			if (error == 0)

+ 3 - 0
Unity/Assets/Scripts/Module/Message/Network/AService.cs

@@ -7,6 +7,7 @@ namespace ETModel
 	{
 		KCP,
 		TCP,
+		WebSocket,
 	}
 
 	public abstract class AService: Component
@@ -33,6 +34,8 @@ namespace ETModel
 		}
 
 		public abstract AChannel ConnectChannel(IPEndPoint ipEndPoint);
+		
+		public abstract AChannel ConnectChannel(string address);
 
 		public abstract void Remove(long channelId);
 

+ 5 - 0
Unity/Assets/Scripts/Module/Message/Network/KCP/KService.cs

@@ -294,6 +294,11 @@ namespace ETModel
 			return channel;
 		}
 
+		public override AChannel ConnectChannel(string address)
+		{
+			throw new NotImplementedException();
+		}
+
 		public override void Remove(long id)
 		{
 			KChannel channel;

+ 5 - 0
Unity/Assets/Scripts/Module/Message/Network/TCP/TService.cs

@@ -131,6 +131,11 @@ namespace ETModel
 			return channel;
 		}
 
+		public override AChannel ConnectChannel(string address)
+		{
+			throw new NotImplementedException();
+		}
+
 		public override void Remove(long id)
 		{
 			TChannel channel;

+ 34 - 2
Unity/Assets/Scripts/Module/Message/NetworkComponent.cs

@@ -29,8 +29,11 @@ namespace ETModel
 					case NetworkProtocol.TCP:
 						this.Service = new TService();
 						break;
-					default:
-						throw new ArgumentOutOfRangeException();
+#if SERVER
+					case NetworkProtocol.WebSocket:
+						this.Service = new WService();
+						break;
+#endif
 				}
 
 				this.Service.AcceptCallback += this.OnAccept;
@@ -69,6 +72,24 @@ namespace ETModel
 			}
 		}
 
+#if SERVER
+		public void Awake(NetworkProtocol protocol, List<string> prefixs)
+		{
+			try
+			{
+				this.Service = new WService(prefixs);
+				
+				this.Service.AcceptCallback += this.OnAccept;
+				
+				this.Start();
+			}
+			catch (Exception e)
+			{
+				throw new Exception($"websocket error: {prefixs}", e);
+			}
+		}
+#endif
+
 		public void Start()
 		{
 			this.Service.Start();
@@ -113,6 +134,17 @@ namespace ETModel
 			this.sessions.Add(session.Id, session);
 			return session;
 		}
+		
+		/// <summary>
+		/// 创建一个新Session
+		/// </summary>
+		public Session Create(string url)
+		{
+			AChannel channel = this.Service.ConnectChannel(url);
+			Session session = ComponentFactory.CreateWithParent<Session, AChannel>(this, channel);
+			this.sessions.Add(session.Id, session);
+			return session;
+		}
 
 		public void Update()
 		{

+ 8 - 4
Unity/Unity.csproj

@@ -12,13 +12,16 @@
     <ProjectTypeGuids>{E097FAD1-6243-4DAD-9C02-E9B9EFC3FFC1};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
     <TargetFrameworkIdentifier>.NETFramework</TargetFrameworkIdentifier>
     <TargetFrameworkVersion>v4.6</TargetFrameworkVersion>
-    <TargetFrameworkProfile></TargetFrameworkProfile>
-    <CompilerResponseFile></CompilerResponseFile>
+    <TargetFrameworkProfile>
+    </TargetFrameworkProfile>
+    <CompilerResponseFile>
+    </CompilerResponseFile>
     <UnityProjectGenerator>VSTU</UnityProjectGenerator>
     <UnityProjectType>Game:1</UnityProjectType>
     <UnityBuildTarget>StandaloneWindows64:19</UnityBuildTarget>
     <UnityVersion>2017.4.9f1</UnityVersion>
-    <RootNamespace></RootNamespace>
+    <RootNamespace>
+    </RootNamespace>
     <LangVersion>6</LangVersion>
   </PropertyGroup>
   <PropertyGroup>
@@ -816,6 +819,7 @@
     <None Include="Assets\StreamingAssets\Version.txt" />
     <None Include="Assets\link.xml" />
   </ItemGroup>
+  <ItemGroup />
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Target Name="GenerateTargetFrameworkMonikerAttribute" />
-</Project>
+</Project>