|
|
@@ -19,6 +19,8 @@ namespace ETModel
|
|
|
private bool isConnected;
|
|
|
|
|
|
private readonly MemoryStream memoryStream;
|
|
|
+
|
|
|
+ private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
|
|
|
|
|
|
public WChannel(HttpListenerWebSocketContext webSocketContext, AService service): base(service, ChannelType.Accept)
|
|
|
{
|
|
|
@@ -50,8 +52,13 @@ namespace ETModel
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
base.Dispose();
|
|
|
|
|
|
+ this.cancellationTokenSource.Cancel();
|
|
|
+ this.cancellationTokenSource.Dispose();
|
|
|
+ this.cancellationTokenSource = null;
|
|
|
+
|
|
|
this.webSocket.Dispose();
|
|
|
|
|
|
this.memoryStream.Dispose();
|
|
|
@@ -84,7 +91,7 @@ namespace ETModel
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- await ((ClientWebSocket)this.webSocket).ConnectAsync(new Uri(url), new CancellationToken());
|
|
|
+ await ((ClientWebSocket)this.webSocket).ConnectAsync(new Uri(url), cancellationTokenSource.Token);
|
|
|
isConnected = true;
|
|
|
this.Start();
|
|
|
}
|
|
|
@@ -109,6 +116,10 @@ namespace ETModel
|
|
|
|
|
|
public async void StartSend()
|
|
|
{
|
|
|
+ if (this.IsDisposed)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
try
|
|
|
{
|
|
|
if (this.isSending)
|
|
|
@@ -129,7 +140,11 @@ namespace ETModel
|
|
|
byte[] bytes = this.queue.Dequeue();
|
|
|
try
|
|
|
{
|
|
|
- await this.webSocket.SendAsync(new ReadOnlyMemory<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Binary, true, new CancellationToken());
|
|
|
+ await this.webSocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
|
|
|
+ if (this.IsDisposed)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
@@ -147,14 +162,21 @@ namespace ETModel
|
|
|
|
|
|
public async void StartRecv()
|
|
|
{
|
|
|
+ if (this.IsDisposed)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
try
|
|
|
{
|
|
|
while (true)
|
|
|
{
|
|
|
- ValueWebSocketReceiveResult receiveResult;
|
|
|
try
|
|
|
{
|
|
|
- receiveResult = await this.webSocket.ReceiveAsync(new Memory<byte>(this.Stream.GetBuffer(), 0, this.Stream.Capacity), new CancellationToken());
|
|
|
+ var receiveResult = await this.webSocket.ReceiveAsync(new ArraySegment<byte>(this.Stream.GetBuffer(), 0, this.Stream.Capacity), cancellationTokenSource.Token);
|
|
|
+ if (this.IsDisposed)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
if (receiveResult.MessageType == WebSocketMessageType.Close)
|
|
|
{
|
|
|
this.OnError(ErrorCode.ERR_WebsocketPeerReset);
|
|
|
@@ -163,20 +185,19 @@ namespace ETModel
|
|
|
|
|
|
if (receiveResult.Count > ushort.MaxValue)
|
|
|
{
|
|
|
- await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveResult.Count}", new CancellationToken());
|
|
|
+ await this.webSocket.CloseAsync(WebSocketCloseStatus.MessageTooBig, $"message too big: {receiveResult.Count}", cancellationTokenSource.Token);
|
|
|
this.OnError(ErrorCode.ERR_WebsocketMessageTooBig);
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ this.Stream.SetLength(receiveResult.Count);
|
|
|
+ this.OnRead(this.Stream);
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
- Log.Error(e);
|
|
|
this.OnError(ErrorCode.ERR_WebsocketRecvError);
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- this.Stream.SetLength(receiveResult.Count);
|
|
|
- this.OnRead(this.Stream);
|
|
|
}
|
|
|
}
|
|
|
catch (Exception e)
|