Explorar el Código

websocket RecieveAsync方法未必每次都能接收完整的消息,所以需要用循环来接收直到EndOfMessage为止

tanghai hace 6 años
padre
commit
d076ea6fa7
Se han modificado 1 ficheros con 48 adiciones y 35 borrados
  1. 48 35
      Unity/Assets/Model/Module/Message/Network/WebSocket/WChannel.cs

+ 48 - 35
Unity/Assets/Model/Module/Message/Network/WebSocket/WChannel.cs

@@ -12,7 +12,7 @@ namespace ETModel
 
         private readonly WebSocket webSocket;
 
-		private readonly Queue<byte[]> queue = new Queue<byte[]>();
+        private readonly Queue<byte[]> queue = new Queue<byte[]>();
 
         private bool isSending;
 
@@ -23,23 +23,23 @@ namespace ETModel
         private readonly MemoryStream recvStream;
 
         private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
-        
+
         public WChannel(HttpListenerWebSocketContext webSocketContext, AService service): base(service, ChannelType.Accept)
         {
             this.WebSocketContext = webSocketContext;
 
             this.webSocket = webSocketContext.WebSocket;
-            
+
             this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
             this.recvStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
 
             isConnected = true;
         }
-        
+
         public WChannel(WebSocket webSocket, AService service): base(service, ChannelType.Connect)
         {
             this.webSocket = webSocket;
-            
+
             this.memoryStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
             this.recvStream = this.GetService().MemoryStreamManager.GetStream("message", ushort.MaxValue);
 
@@ -52,15 +52,15 @@ namespace ETModel
             {
                 return;
             }
-            
+
             base.Dispose();
-            
+
             this.cancellationTokenSource.Cancel();
             this.cancellationTokenSource.Dispose();
             this.cancellationTokenSource = null;
-            
+
             this.webSocket.Dispose();
-            
+
             this.memoryStream.Dispose();
         }
 
@@ -71,27 +71,28 @@ namespace ETModel
                 return this.memoryStream;
             }
         }
-        
+
         public override void Start()
         {
             if (!this.isConnected)
             {
                 return;
             }
+
             this.StartRecv().Coroutine();
             this.StartSend().Coroutine();
         }
-        
+
         private WService GetService()
         {
-            return (WService)this.Service;
+            return (WService) this.Service;
         }
 
         public async ETVoid ConnectAsync(string url)
         {
             try
             {
-                await ((ClientWebSocket)this.webSocket).ConnectAsync(new Uri(url), cancellationTokenSource.Token);
+                await ((ClientWebSocket) this.webSocket).ConnectAsync(new Uri(url), cancellationTokenSource.Token);
                 isConnected = true;
                 this.Start();
             }
@@ -120,15 +121,16 @@ namespace ETModel
             {
                 return;
             }
+
             try
             {
                 if (this.isSending)
                 {
                     return;
                 }
-            
+
                 this.isSending = true;
-            
+
                 while (true)
                 {
                     if (this.queue.Count == 0)
@@ -166,48 +168,59 @@ namespace ETModel
             {
                 return;
             }
+
             try
             {
                 while (true)
                 {
-                    try
+#if SERVER
+                    ValueWebSocketReceiveResult receiveResult;
+#else
+                    WebSocketReceiveResult receiveResult;
+#endif
+                    int receiveCount = 0;
+                    do
                     {
 #if SERVER
-                        ValueWebSocketReceiveResult receiveResult = await this.webSocket.ReceiveAsync(new Memory<byte>(this.recvStream.GetBuffer(), 0, this.recvStream.Capacity), cancellationTokenSource.Token);
+                        receiveResult = await this.webSocket.ReceiveAsync(
+                            new Memory<byte>(this.recvStream.GetBuffer(), receiveCount, this.recvStream.Capacity - receiveCount),
+                            cancellationTokenSource.Token);
 #else
-                        WebSocketReceiveResult receiveResult = await this.webSocket.ReceiveAsync(new ArraySegment<byte>(this.recvStream.GetBuffer(), 0, this.recvStream.Capacity), cancellationTokenSource.Token);
+                        receiveResult = await this.webSocket.ReceiveAsync(
+                            new ArraySegment<byte>(this.recvStream.GetBuffer(), receiveCount, this.recvStream.Capacity - receiveCount), 
+                            cancellationTokenSource.Token);
 #endif
-                        
                         if (this.IsDisposed)
                         {
                             return;
                         }
-                        if (receiveResult.MessageType == WebSocketMessageType.Close)
-                        {
-                            this.OnError(ErrorCode.ERR_WebsocketPeerReset);
-                            return;
-                        }
 
-                        if (receiveResult.Count > ushort.MaxValue)
-                        {
-                            await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveResult.Count}", cancellationTokenSource.Token);
-                            this.OnError(ErrorCode.ERR_WebsocketMessageTooBig);
-                            return;
-                        }
-                        
-                        this.recvStream.SetLength(receiveResult.Count);
-                        this.OnRead(this.recvStream);
+                        receiveCount += receiveResult.Count;
                     }
-                    catch (Exception)
+                    while (!receiveResult.EndOfMessage);
+
+                    if (receiveResult.MessageType == WebSocketMessageType.Close)
                     {
-                        this.OnError(ErrorCode.ERR_WebsocketRecvError);
+                        this.OnError(ErrorCode.ERR_WebsocketPeerReset);
                         return;
                     }
+
+                    if (receiveResult.Count > ushort.MaxValue)
+                    {
+                        await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveResult.Count}",
+                            cancellationTokenSource.Token);
+                        this.OnError(ErrorCode.ERR_WebsocketMessageTooBig);
+                        return;
+                    }
+
+                    this.recvStream.SetLength(receiveResult.Count);
+                    this.OnRead(this.recvStream);
                 }
             }
             catch (Exception e)
             {
                 Log.Error(e);
+                this.OnError(ErrorCode.ERR_WebsocketRecvError);
             }
         }
     }