| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- #if !UNITY_WEBGL
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Net;
- using System.Net.WebSockets;
- using System.Threading;
- using System.Threading.Tasks;
- namespace ET
- {
- public class WChannel: AChannel
- {
- private readonly WService Service;
- private readonly WebSocket webSocket;
- private readonly Queue<MemoryStream> queue = new Queue<MemoryStream>();
- private bool isSending;
- private bool isConnected;
- private readonly MemoryStream recvStream;
- private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
- public WChannel(long id, WebSocket webSocket, WService service, IPEndPoint remoteAddress = null)
- {
- this.Id = id;
- this.Service = service;
- this.ChannelType = ChannelType.Accept;
- this.webSocket = webSocket;
- this.recvStream = new MemoryStream(ushort.MaxValue);
- this.RemoteAddress = remoteAddress;
- isConnected = true;
-
- this.Service.ThreadSynchronizationContext.PostNext(()=>
- {
- this.StartRecv().Coroutine();
- this.StartSend().Coroutine();
- });
- }
- public WChannel(long id, WebSocket webSocket, string connectUrl, WService service)
- {
- this.Id = id;
- this.Service = service;
- this.ChannelType = ChannelType.Connect;
- this.webSocket = webSocket;
- this.recvStream = new MemoryStream(ushort.MaxValue);
- isConnected = false;
- this.Service.ThreadSynchronizationContext.Post(() => this.ConnectAsync(connectUrl).Coroutine());
- }
- public override void Dispose()
- {
- if (this.IsDisposed)
- {
- return;
- }
- this.Id = 0;
- this.cancellationTokenSource.Cancel();
- this.cancellationTokenSource.Dispose();
- this.webSocket.Dispose();
- }
- public async ETTask ConnectAsync(string url)
- {
- try
- {
- await ((ClientWebSocket) this.webSocket).ConnectAsync(new Uri(url), cancellationTokenSource.Token);
- isConnected = true;
-
- this.StartRecv().Coroutine();
- this.StartSend().Coroutine();
- }
- catch (Exception e)
- {
- Log.Error(e);
- this.OnError(ErrorCore.ERR_WebsocketConnectError);
- }
- }
- public void Send(MemoryStream stream)
- {
- switch (this.Service.ServiceType)
- {
- case ServiceType.Inner:
- break;
- case ServiceType.Outer:
- stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
- break;
- }
- this.queue.Enqueue(stream);
- if (this.isConnected)
- {
- this.StartSend().Coroutine();
- }
- }
- public async ETTask StartSend()
- {
- if (this.IsDisposed)
- {
- return;
- }
- try
- {
- if (this.isSending)
- {
- return;
- }
- this.isSending = true;
- while (true)
- {
- if (this.queue.Count == 0)
- {
- this.isSending = false;
- return;
- }
- MemoryStream bytes = this.queue.Dequeue();
- try
- {
- await this.webSocket.SendAsync(new ReadOnlyMemory<byte>(bytes.GetBuffer(), (int)bytes.Position, (int)(bytes.Length - bytes.Position)), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
- if (this.IsDisposed)
- {
- return;
- }
- }
- catch (Exception e)
- {
- Log.Error(e);
- this.OnError(ErrorCore.ERR_WebsocketSendError);
- return;
- }
- }
- }
- catch (Exception e)
- {
- Log.Error(e);
- }
- }
- private byte[] cache = new byte[ushort.MaxValue];
- public async ETTask StartRecv()
- {
- if (this.IsDisposed)
- {
- return;
- }
- try
- {
- while (true)
- {
- ValueWebSocketReceiveResult receiveResult;
- int receiveCount = 0;
- do
- {
- receiveResult = await this.webSocket.ReceiveAsync(
- new Memory<byte>(cache, receiveCount, this.cache.Length - receiveCount),
- cancellationTokenSource.Token);
- if (this.IsDisposed)
- {
- return;
- }
- receiveCount += receiveResult.Count;
- }
- while (!receiveResult.EndOfMessage);
- if (receiveResult.MessageType == WebSocketMessageType.Close)
- {
- this.OnError(ErrorCore.ERR_WebsocketPeerReset);
- return;
- }
- if (receiveResult.Count > ushort.MaxValue)
- {
- await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveCount}",
- cancellationTokenSource.Token);
- this.OnError(ErrorCore.ERR_WebsocketMessageTooBig);
- return;
- }
-
- this.recvStream.SetLength(receiveCount);
- this.recvStream.Seek(2, SeekOrigin.Begin);
- Array.Copy(this.cache, 0, this.recvStream.GetBuffer(), 0, receiveCount);
- this.OnRead(this.recvStream);
- }
- }
- catch (WebSocketException)
- {
- this.OnError(ErrorCore.ERR_WebsocketRecvError);
- }
- catch (TaskCanceledException)
- {
- this.OnError(ErrorCore.ERR_WebsocketTaskCanceledError);
- }
- catch (Exception e)
- {
- Log.Error(e);
- this.OnError(ErrorCore.ERR_WebsocketOtherError);
- }
- }
-
- private void OnRead(MemoryStream memoryStream)
- {
- try
- {
- long channelId = this.Id;
- this.Service.OnRead(channelId, memoryStream);
- }
- catch (Exception e)
- {
- Log.Error($"{this.RemoteAddress} {memoryStream.Length} {e}");
- // 出现任何消息解析异常都要断开Session,防止客户端伪造消息
- this.OnError(ErrorCore.ERR_PacketParserError);
- }
- }
-
- private void OnError(int error)
- {
- Log.Debug($"WChannel error: {error} {this.RemoteAddress}");
-
- long channelId = this.Id;
-
- this.Service.OnError(channelId, error);
- }
- }
- }
- #endif
|