|
|
@@ -1,6 +1,7 @@
|
|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
|
+using System.Net;
|
|
|
using System.Net.Sockets;
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
@@ -8,7 +9,7 @@ namespace Model
|
|
|
{
|
|
|
public class TChannel : AChannel
|
|
|
{
|
|
|
- private readonly TSocket socket;
|
|
|
+ private readonly TcpClient tcpClient;
|
|
|
|
|
|
private readonly TBuffer recvBuffer = new TBuffer();
|
|
|
private readonly TBuffer sendBuffer = new TBuffer();
|
|
|
@@ -21,32 +22,47 @@ namespace Model
|
|
|
/// <summary>
|
|
|
/// connect
|
|
|
/// </summary>
|
|
|
- public TChannel(TSocket socket, string host, int port, TService service) : base(service, ChannelType.Connect)
|
|
|
+ public TChannel(TcpClient tcpClient, string host, int port, TService service) : base(service, ChannelType.Connect)
|
|
|
{
|
|
|
- this.socket = socket;
|
|
|
+ this.tcpClient = tcpClient;
|
|
|
this.parser = new PacketParser(this.recvBuffer);
|
|
|
this.RemoteAddress = host + ":" + port;
|
|
|
-
|
|
|
- bool result = this.socket.ConnectAsync(host, port);
|
|
|
- if (!result)
|
|
|
- {
|
|
|
- this.OnConnected(this.Id, SocketError.Success);
|
|
|
- return;
|
|
|
- }
|
|
|
- this.socket.OnConn += e => OnConnected(this.Id, e);
|
|
|
+
|
|
|
+ this.ConnectAsync(host, port);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// accept
|
|
|
/// </summary>
|
|
|
- public TChannel(TSocket socket, TService service) : base(service, ChannelType.Accept)
|
|
|
+ public TChannel(TcpClient tcpClient, TService service) : base(service, ChannelType.Accept)
|
|
|
{
|
|
|
- this.socket = socket;
|
|
|
+ this.tcpClient = tcpClient;
|
|
|
this.parser = new PacketParser(this.recvBuffer);
|
|
|
- this.RemoteAddress = socket.RemoteAddress;
|
|
|
+
|
|
|
+ IPEndPoint ipEndPoint = (IPEndPoint)this.tcpClient.Client.RemoteEndPoint;
|
|
|
+ this.RemoteAddress = ipEndPoint.Address + ":" + ipEndPoint.Port;
|
|
|
this.OnAccepted();
|
|
|
}
|
|
|
|
|
|
+ private async void ConnectAsync(string host, int port)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ await this.tcpClient.ConnectAsync(host, port);
|
|
|
+ this.isConnected = true;
|
|
|
+ this.StartSend();
|
|
|
+ this.StartRecv();
|
|
|
+ }
|
|
|
+ catch (SocketException e)
|
|
|
+ {
|
|
|
+ Log.Error($"connect error: {e.SocketErrorCode}");
|
|
|
+ }
|
|
|
+ catch (Exception e)
|
|
|
+ {
|
|
|
+ Log.Error(e.ToString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public override void Dispose()
|
|
|
{
|
|
|
if (this.Id == 0)
|
|
|
@@ -58,7 +74,7 @@ namespace Model
|
|
|
|
|
|
base.Dispose();
|
|
|
|
|
|
- this.socket.Dispose();
|
|
|
+ this.tcpClient.Close();
|
|
|
this.service.Remove(id);
|
|
|
}
|
|
|
|
|
|
@@ -69,32 +85,16 @@ namespace Model
|
|
|
this.StartRecv();
|
|
|
}
|
|
|
|
|
|
- private void OnConnected(long channelId, SocketError error)
|
|
|
- {
|
|
|
- if (this.service.GetChannel(channelId) == null)
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- if (error != SocketError.Success)
|
|
|
- {
|
|
|
- Log.Error($"connect error: {error}");
|
|
|
- return;
|
|
|
- }
|
|
|
- this.isConnected = true;
|
|
|
- this.StartSend();
|
|
|
- this.StartRecv();
|
|
|
- }
|
|
|
-
|
|
|
public override void Send(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
|
|
|
{
|
|
|
if (this.Id == 0)
|
|
|
{
|
|
|
throw new Exception("TChannel已经被Dispose, 不能发送消息");
|
|
|
}
|
|
|
- byte[] size = BitConverter.GetBytes(buffer.Length);
|
|
|
+ byte[] size = BitConverter.GetBytes((ushort)buffer.Length);
|
|
|
this.sendBuffer.SendTo(size);
|
|
|
this.sendBuffer.SendTo(buffer);
|
|
|
- if (!this.isSending && this.isConnected)
|
|
|
+ if (this.isConnected)
|
|
|
{
|
|
|
this.StartSend();
|
|
|
}
|
|
|
@@ -106,123 +106,115 @@ namespace Model
|
|
|
{
|
|
|
throw new Exception("TChannel已经被Dispose, 不能发送消息");
|
|
|
}
|
|
|
- int size = buffers.Select(b => b.Length).Sum();
|
|
|
+ ushort size = (ushort)buffers.Select(b => b.Length).Sum();
|
|
|
+ size = NetworkHelper.HostToNetworkOrder(size);
|
|
|
byte[] sizeBuffer = BitConverter.GetBytes(size);
|
|
|
this.sendBuffer.SendTo(sizeBuffer);
|
|
|
foreach (byte[] buffer in buffers)
|
|
|
{
|
|
|
this.sendBuffer.SendTo(buffer);
|
|
|
}
|
|
|
- if (!this.isSending && this.isConnected)
|
|
|
+ if (this.isConnected)
|
|
|
{
|
|
|
this.StartSend();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void StartSend()
|
|
|
- {
|
|
|
- if (this.Id == 0)
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- // 没有数据需要发送
|
|
|
- if (this.sendBuffer.Count == 0)
|
|
|
- {
|
|
|
- this.isSending = false;
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- this.isSending = true;
|
|
|
-
|
|
|
- int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
|
|
|
- if (sendSize > this.sendBuffer.Count)
|
|
|
- {
|
|
|
- sendSize = this.sendBuffer.Count;
|
|
|
- }
|
|
|
-
|
|
|
- if (!this.socket.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize))
|
|
|
- {
|
|
|
- this.OnSend(sendSize, SocketError.Success);
|
|
|
- return;
|
|
|
- }
|
|
|
- this.socket.OnSend = this.OnSend;
|
|
|
- }
|
|
|
-
|
|
|
- private void OnSend(int n, SocketError error)
|
|
|
+ private async void StartSend()
|
|
|
{
|
|
|
- if (this.Id == 0)
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- this.socket.OnSend = null;
|
|
|
- if (error != SocketError.Success)
|
|
|
- {
|
|
|
- this.OnError(this, error);
|
|
|
- return;
|
|
|
- }
|
|
|
- this.sendBuffer.FirstIndex += n;
|
|
|
- if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
|
|
|
+ try
|
|
|
{
|
|
|
- this.sendBuffer.FirstIndex = 0;
|
|
|
- this.sendBuffer.RemoveFirst();
|
|
|
- }
|
|
|
-
|
|
|
- this.StartSend();
|
|
|
- }
|
|
|
+ // 如果正在发送中,不需要再次发送
|
|
|
+ if (this.isSending)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- private void StartRecv()
|
|
|
- {
|
|
|
- if (this.Id == 0)
|
|
|
- {
|
|
|
- return;
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ if (this.Id == 0)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 没有数据需要发送
|
|
|
+ if (this.sendBuffer.Count == 0)
|
|
|
+ {
|
|
|
+ this.isSending = false;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ this.isSending = true;
|
|
|
+
|
|
|
+ int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
|
|
|
+ if (sendSize > this.sendBuffer.Count)
|
|
|
+ {
|
|
|
+ sendSize = this.sendBuffer.Count;
|
|
|
+ }
|
|
|
+ await this.tcpClient.GetStream().WriteAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
|
|
|
+ this.sendBuffer.FirstIndex += sendSize;
|
|
|
+ if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
|
|
|
+ {
|
|
|
+ this.sendBuffer.FirstIndex = 0;
|
|
|
+ this.sendBuffer.RemoveFirst();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- int size = TBuffer.ChunkSize - this.recvBuffer.LastIndex;
|
|
|
-
|
|
|
- if (!this.socket.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex, size))
|
|
|
+ catch (Exception e)
|
|
|
{
|
|
|
- this.OnRecv(size, SocketError.Success);
|
|
|
+ Log.Error(e.ToString());
|
|
|
+ this.OnError(this, SocketError.SocketError);
|
|
|
}
|
|
|
- this.socket.OnRecv = this.OnRecv;
|
|
|
}
|
|
|
|
|
|
- private void OnRecv(int n, SocketError error)
|
|
|
+ private async void StartRecv()
|
|
|
{
|
|
|
- if (this.Id == 0)
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- this.socket.OnRecv = null;
|
|
|
- if (error != SocketError.Success)
|
|
|
- {
|
|
|
- this.OnError(this, error);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- if (n == 0)
|
|
|
+ try
|
|
|
{
|
|
|
- this.OnError(this, error);
|
|
|
- return;
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ if (this.Id == 0)
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int size = TBuffer.ChunkSize - this.recvBuffer.LastIndex;
|
|
|
+
|
|
|
+ int n = await this.tcpClient.GetStream().ReadAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);
|
|
|
+
|
|
|
+ if (n == 0)
|
|
|
+ {
|
|
|
+ this.OnError(this, SocketError.NetworkReset);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ this.recvBuffer.LastIndex += n;
|
|
|
+
|
|
|
+ if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
|
|
|
+ {
|
|
|
+ this.recvBuffer.AddLast();
|
|
|
+ this.recvBuffer.LastIndex = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.recvTcs != null)
|
|
|
+ {
|
|
|
+ byte[] packet = this.parser.GetPacket();
|
|
|
+ if (packet != null)
|
|
|
+ {
|
|
|
+ var tcs = this.recvTcs;
|
|
|
+ this.recvTcs = null;
|
|
|
+ tcs.SetResult(packet);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- this.recvBuffer.LastIndex += n;
|
|
|
- if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
|
|
|
+ catch (ObjectDisposedException)
|
|
|
{
|
|
|
- this.recvBuffer.AddLast();
|
|
|
- this.recvBuffer.LastIndex = 0;
|
|
|
}
|
|
|
-
|
|
|
- if (this.recvTcs != null)
|
|
|
+ catch (Exception e)
|
|
|
{
|
|
|
- byte[] packet = this.parser.GetPacket();
|
|
|
- if (packet != null)
|
|
|
- {
|
|
|
- var tcs = this.recvTcs;
|
|
|
- this.recvTcs = null;
|
|
|
- tcs.SetResult(packet);
|
|
|
- }
|
|
|
+ Log.Error(e.ToString());
|
|
|
+ this.OnError(this, SocketError.SocketError);
|
|
|
}
|
|
|
-
|
|
|
- StartRecv();
|
|
|
}
|
|
|
|
|
|
public override Task<byte[]> Recv()
|
|
|
@@ -231,13 +223,13 @@ namespace Model
|
|
|
{
|
|
|
throw new Exception("TChannel已经被Dispose, 不能接收消息");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
byte[] packet = this.parser.GetPacket();
|
|
|
if (packet != null)
|
|
|
{
|
|
|
return Task.FromResult(packet);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
recvTcs = new TaskCompletionSource<byte[]>();
|
|
|
return recvTcs.Task;
|
|
|
}
|