Просмотр исходного кода

加回WebSocket跟Http组件,方便跟平台对接

tanghai 4 лет назад
Родитель
Сommit
da0893593b

+ 127 - 0
Server/Hotfix/Module/Http/HttpComponentSystem.cs

@@ -0,0 +1,127 @@
+using System;
+using System.Collections.Generic;
+using System.Net;
+
+namespace ET
+{
+    public class HttpComponentAwakeSystem : AwakeSystem<HttpComponent, string>
+    {
+        public override void Awake(HttpComponent self, string address)
+        {
+            try
+            {
+                self.Load();
+                
+                self.Listener = new HttpListener();
+
+                foreach (string s in address.Split(';'))
+                {
+                    if (s.Trim() == "")
+                    {
+                        continue;
+                    }
+                    self.Listener.Prefixes.Add(s);
+                }
+
+                self.Listener.Start();
+
+                self.Accept().Coroutine();
+            }
+            catch (HttpListenerException e)
+            {
+                throw new Exception($"请现在cmd中运行: netsh http add urlacl url=http://*:你的address中的端口/ user=Everyone, address: {address}", e);
+            }
+        }
+    }
+
+    [ObjectSystem]
+    public class HttpComponentLoadSystem: LoadSystem<HttpComponent>
+    {
+        public override void Load(HttpComponent self)
+        {
+            self.Load();
+        }
+    }
+
+    [ObjectSystem]
+    public class HttpComponentDestroySystem: DestroySystem<HttpComponent>
+    {
+        public override void Destroy(HttpComponent self)
+        {
+            self.Listener.Stop();
+            self.Listener.Close();
+        }
+    }
+    
+    public static class HttpComponentSystem
+    {
+        public static void Load(this HttpComponent self)
+        {
+            self.dispatcher = new Dictionary<string, IHttpHandler>();
+
+            HashSet<Type> types = EventSystem.Instance.GetTypes(typeof (HttpHandlerAttribute));
+
+            SceneType sceneType = self.GetParent<Scene>().SceneType;
+
+            foreach (Type type in types)
+            {
+                object[] attrs = type.GetCustomAttributes(typeof(HttpHandlerAttribute), false);
+                if (attrs.Length == 0)
+                {
+                    continue;
+                }
+
+                HttpHandlerAttribute httpHandlerAttribute = (HttpHandlerAttribute)attrs[0];
+
+                if (httpHandlerAttribute.SceneType != sceneType)
+                {
+                    continue;
+                }
+
+                object obj = Activator.CreateInstance(type);
+
+                IHttpHandler ihttpHandler = obj as IHttpHandler;
+                if (ihttpHandler == null)
+                {
+                    throw new Exception($"HttpHandler handler not inherit IHttpHandler class: {obj.GetType().FullName}");
+                }
+                self.dispatcher.Add(httpHandlerAttribute.Path, ihttpHandler);
+            }
+        }
+        
+        public static async ETTask Accept(this HttpComponent self)
+        {
+            long instanceId = self.InstanceId;
+            while (self.InstanceId == instanceId)
+            {
+                try
+                {
+                    HttpListenerContext context = await self.Listener.GetContextAsync();
+                    self.Handle(context).Coroutine();
+                }
+                catch (Exception e)
+                {
+                    Log.Error(e);
+                }
+            }
+        }
+
+        public static async ETTask Handle(this HttpComponent self, HttpListenerContext context)
+        {
+            try
+            {
+                IHttpHandler handler;
+                if (self.dispatcher.TryGetValue(context.Request.Url.AbsolutePath, out handler))
+                {
+                    await handler.Handle(self.Domain, context);
+                }
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+            }
+            context.Request.InputStream.Dispose();
+            context.Response.OutputStream.Dispose();
+        }
+    }
+}

+ 14 - 0
Server/Model/Module/Http/HttpComponent.cs

@@ -0,0 +1,14 @@
+using System.Collections.Generic;
+using System.Net;
+
+namespace ET
+{
+    /// <summary>
+    /// http请求分发器
+    /// </summary>
+    public class HttpComponent: Entity
+    {
+        public HttpListener Listener;
+        public Dictionary<string, IHttpHandler> dispatcher;
+    }
+}

+ 15 - 0
Server/Model/Module/Http/HttpHandlerAttribute.cs

@@ -0,0 +1,15 @@
+namespace ET
+{
+    public class HttpHandlerAttribute: BaseAttribute
+    {
+        public SceneType SceneType { get; }
+
+        public string Path { get; }
+
+        public HttpHandlerAttribute(SceneType sceneType, string path)
+        {
+            this.SceneType = sceneType;
+            this.Path = path;
+        }
+    }
+}

+ 9 - 0
Server/Model/Module/Http/IHttpHandler.cs

@@ -0,0 +1,9 @@
+using System.Net;
+
+namespace ET
+{
+    public interface IHttpHandler
+    {
+        ETTask Handle(Entity domain, HttpListenerContext context);
+    }
+}

+ 240 - 0
Unity/Assets/Mono/Module/NetworkTCP/WChannel.cs

@@ -0,0 +1,240 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.WebSockets;
+using System.Threading;
+
+namespace ET
+{
+    public class WChannel: AChannel
+    {
+        public HttpListenerWebSocketContext WebSocketContext { get; }
+
+        private readonly WService Service;
+
+        private readonly WebSocket webSocket;
+
+        private readonly Queue<byte[]> queue = new Queue<byte[]>();
+
+        private bool isSending;
+
+        private bool isConnected;
+
+        private readonly MemoryStream recvStream;
+
+        private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
+
+        public WChannel(long id, HttpListenerWebSocketContext webSocketContext, WService service)
+        {
+            this.Id = id;
+            this.Service = service;
+            this.ChannelType = ChannelType.Accept;
+            this.WebSocketContext = webSocketContext;
+            this.webSocket = webSocketContext.WebSocket;
+            this.recvStream = new MemoryStream(ushort.MaxValue);
+
+            isConnected = true;
+            
+            this.Service.ThreadSynchronizationContext.PostNext(()=>
+            {
+                this.StartRecv().Coroutine();
+                this.StartSend().Coroutine();
+            });
+        }
+
+        public WChannel(long id, WebSocket webSocket, string connectUrl, WService service)
+        {
+            this.Id = id;
+            this.Service = service;
+            this.ChannelType = ChannelType.Connect;
+            this.webSocket = webSocket;
+            this.recvStream = new MemoryStream(ushort.MaxValue);
+
+            isConnected = false;
+            
+            this.Service.ThreadSynchronizationContext.PostNext(()=>this.ConnectAsync(connectUrl).Coroutine());
+        }
+
+        public override void Dispose()
+        {
+            if (this.IsDisposed)
+            {
+                return;
+            }
+
+            this.cancellationTokenSource.Cancel();
+            this.cancellationTokenSource.Dispose();
+            this.cancellationTokenSource = null;
+
+            this.webSocket.Dispose();
+        }
+
+        public async ETTask ConnectAsync(string url)
+        {
+            try
+            {
+                await ((ClientWebSocket) this.webSocket).ConnectAsync(new Uri(url), cancellationTokenSource.Token);
+                isConnected = true;
+                
+                this.StartRecv().Coroutine();
+                this.StartSend().Coroutine();
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+                this.OnError(ErrorCore.ERR_WebsocketConnectError);
+            }
+        }
+
+        public void Send(MemoryStream stream)
+        {
+            byte[] bytes = new byte[stream.Length];
+            Array.Copy(stream.GetBuffer(), bytes, bytes.Length);
+            this.queue.Enqueue(bytes);
+
+            if (this.isConnected)
+            {
+                this.StartSend().Coroutine();
+            }
+        }
+
+        public async ETTask StartSend()
+        {
+            if (this.IsDisposed)
+            {
+                return;
+            }
+
+            try
+            {
+                if (this.isSending)
+                {
+                    return;
+                }
+
+                this.isSending = true;
+
+                while (true)
+                {
+                    if (this.queue.Count == 0)
+                    {
+                        this.isSending = false;
+                        return;
+                    }
+
+                    byte[] bytes = this.queue.Dequeue();
+                    try
+                    {
+                        await this.webSocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
+                        if (this.IsDisposed)
+                        {
+                            return;
+                        }
+                    }
+                    catch (Exception e)
+                    {
+                        Log.Error(e);
+                        this.OnError(ErrorCore.ERR_WebsocketSendError);
+                        return;
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+            }
+        }
+
+        private byte[] cache = new byte[ushort.MaxValue];
+
+        public async ETTask StartRecv()
+        {
+            if (this.IsDisposed)
+            {
+                return;
+            }
+
+            try
+            {
+                while (true)
+                {
+#if NOT_UNITY
+                    ValueWebSocketReceiveResult receiveResult;
+#else
+                    WebSocketReceiveResult receiveResult;
+#endif
+                    int receiveCount = 0;
+                    do
+                    {
+#if NOT_UNITY
+                        receiveResult = await this.webSocket.ReceiveAsync(
+                            new Memory<byte>(cache, receiveCount, this.cache.Length - receiveCount),
+                            cancellationTokenSource.Token);
+#else
+                        receiveResult = await this.webSocket.ReceiveAsync(
+                            new ArraySegment<byte>(this.cache, receiveCount, this.cache.Length - receiveCount), 
+                            cancellationTokenSource.Token);
+#endif
+                        if (this.IsDisposed)
+                        {
+                            return;
+                        }
+
+                        receiveCount += receiveResult.Count;
+                    }
+                    while (!receiveResult.EndOfMessage);
+
+                    if (receiveResult.MessageType == WebSocketMessageType.Close)
+                    {
+                        this.OnError(ErrorCore.ERR_WebsocketPeerReset);
+                        return;
+                    }
+
+                    if (receiveResult.Count > ushort.MaxValue)
+                    {
+                        await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveCount}",
+                            cancellationTokenSource.Token);
+                        this.OnError(ErrorCore.ERR_WebsocketMessageTooBig);
+                        return;
+                    }
+                    
+                    this.recvStream.SetLength(receiveCount);
+                    this.recvStream.Seek(2, SeekOrigin.Begin);
+                    Array.Copy(this.cache, 0, this.recvStream.GetBuffer(), 0, receiveCount);
+                    this.OnRead(this.recvStream);
+                }
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+                this.OnError(ErrorCore.ERR_WebsocketRecvError);
+            }
+        }
+        
+        private void OnRead(MemoryStream memoryStream)
+        {
+            try
+            {
+                long channelId = this.Id;
+                this.Service.OnRead(channelId, memoryStream);
+            }
+            catch (Exception e)
+            {
+                Log.Error($"{this.RemoteAddress} {memoryStream.Length} {e}");
+                // 出现任何消息解析异常都要断开Session,防止客户端伪造消息
+                this.OnError(ErrorCore.ERR_PacketParserError);
+            }
+        }
+        
+        private void OnError(int error)
+        {
+            Log.Debug($"WChannel error: {error} {this.RemoteAddress}");
+			
+            long channelId = this.Id;
+			
+            this.Service.Remove(channelId);
+			
+            this.Service.OnError(channelId, error);
+        }
+    }
+}

+ 144 - 0
Unity/Assets/Mono/Module/NetworkTCP/WService.cs

@@ -0,0 +1,144 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Net;
+using System.Net.WebSockets;
+
+namespace ET
+{
+    public class WService: AService
+    {
+        private long idGenerater = 200000000;
+        
+        private HttpListener httpListener;
+        
+        private readonly Dictionary<long, WChannel> channels = new Dictionary<long, WChannel>();
+
+        public WService(ThreadSynchronizationContext threadSynchronizationContext, IEnumerable<string> prefixs)
+        {
+            this.ThreadSynchronizationContext = threadSynchronizationContext;
+            
+            this.httpListener = new HttpListener();
+
+            StartAccept(prefixs).Coroutine();
+        }
+        
+        public WService(ThreadSynchronizationContext threadSynchronizationContext)
+        {
+            this.ThreadSynchronizationContext = threadSynchronizationContext;
+        }
+        
+        private long GetId
+        {
+            get
+            {
+                return ++this.idGenerater;
+            }
+        }
+        
+        public WChannel Create(string address, long id)
+        {
+			ClientWebSocket webSocket = new ClientWebSocket();
+            WChannel channel = new WChannel(id, webSocket, address, this);
+            this.channels[channel.Id] = channel;
+            return channel;
+        }
+
+        public override void Remove(long id)
+        {
+            WChannel channel;
+            if (!this.channels.TryGetValue(id, out channel))
+            {
+                return;
+            }
+
+            this.channels.Remove(id);
+            channel.Dispose();
+        }
+
+        public override bool IsDispose()
+        {
+            return this.ThreadSynchronizationContext == null;
+        }
+
+        protected void Get(long id, string address)
+        {
+            if (!this.channels.TryGetValue(id, out _))
+            {
+                this.Create(address, id);
+            }
+        }
+
+        public override void Dispose()
+        {
+            this.ThreadSynchronizationContext = null;
+            this.httpListener?.Close();
+            this.httpListener = null;
+        }
+
+        private async ETTask StartAccept(IEnumerable<string> prefixs)
+        {
+            try
+            {
+                foreach (string prefix in prefixs)
+                {
+                    this.httpListener.Prefixes.Add(prefix);
+                }
+                
+                httpListener.Start();
+
+                while (true)
+                {
+                    try
+                    {
+                        HttpListenerContext httpListenerContext = await this.httpListener.GetContextAsync();
+
+                        HttpListenerWebSocketContext webSocketContext = await httpListenerContext.AcceptWebSocketAsync(null);
+
+                        WChannel channel = new WChannel(this.GetId, webSocketContext, this);
+
+                        this.channels[channel.Id] = channel;
+
+                        this.OnAccept(channel.Id, channel.RemoteAddress);
+                    }
+                    catch (Exception e)
+                    {
+                        Log.Error(e);
+                    }
+                }
+            }
+            catch (HttpListenerException e)
+            {
+                if (e.ErrorCode == 5)
+                {
+                    throw new Exception($"CMD管理员中输入: netsh http add urlacl url=http://*:8080/ user=Everyone", e);
+                }
+
+                Log.Error(e);
+            }
+            catch (Exception e)
+            {
+                Log.Error(e);
+            }
+        }
+        
+        protected override void Get(long id, IPEndPoint address)
+        {
+            throw new NotImplementedException();
+        }
+
+        protected override void Send(long channelId, long actorId, MemoryStream stream)
+        {
+            this.channels.TryGetValue(channelId, out WChannel channel);
+            if (channel == null)
+            {
+                return;
+            }
+            channel.Send(stream);
+        }
+
+        public override void Update()
+        {
+        }
+    }
+}

+ 2 - 0
Unity/Unity.Mono.csproj

@@ -72,6 +72,8 @@
      <Compile Include="Assets\Mono\ILRuntime\Generate\System_BitConverter_Binding.cs" />
      <Compile Include="Assets\Mono\Core\Helper\NetworkHelper.cs" />
      <Compile Include="Assets\Mono\Core\DoubleMap.cs" />
+     <Compile Include="Assets\Mono\Module\NetworkTCP\WChannel.cs" />
+     <Compile Include="Assets\Mono\Module\NetworkTCP\WService.cs" />
      <Compile Include="Assets\Mono\MonoBehaviour\Init.cs" />
      <Compile Include="Assets\Mono\ILRuntime\Generate\System_Collections_Generic_Dictionary_2_String_ILTypeInstance_Binding_t3.cs" />
      <Compile Include="Assets\Mono\ILRuntime\Generate\System_Collections_Generic_Dictionary_2_Type_ILTypeInstance_Binding_V.cs" />