TChannel.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. namespace Base
  6. {
  7. public class TChannel : AChannel
  8. {
  9. private readonly TSocket socket;
  10. private readonly TBuffer recvBuffer = new TBuffer();
  11. private readonly TBuffer sendBuffer = new TBuffer();
  12. private bool isSending;
  13. private readonly PacketParser parser;
  14. private bool isConnected;
  15. public Action<long, SocketError> OnError;
  16. public string RemoteAddress { get; }
  17. public TChannel(TSocket socket, string host, int port, TService service) : base(service)
  18. {
  19. this.socket = socket;
  20. this.parser = new PacketParser(this.recvBuffer);
  21. this.RemoteAddress = host + ":" + port;
  22. }
  23. public TChannel(TSocket socket, TService service) : base(service)
  24. {
  25. this.socket = socket;
  26. this.parser = new PacketParser(this.recvBuffer);
  27. this.RemoteAddress = socket.RemoteAddress;
  28. }
  29. public override void Dispose()
  30. {
  31. if (this.Id == 0)
  32. {
  33. return;
  34. }
  35. long id = this.Id;
  36. base.Dispose();
  37. this.socket.Dispose();
  38. this.service.Remove(id);
  39. }
  40. public override void ConnectAsync()
  41. {
  42. string[] ss = this.RemoteAddress.Split(':');
  43. int port = int.Parse(ss[1]);
  44. bool result = this.socket.ConnectAsync(ss[0], port);
  45. if (!result)
  46. {
  47. this.OnConnected(this.Id, SocketError.Success);
  48. return;
  49. }
  50. this.socket.OnConn += e => OnConnected(this.Id, e);
  51. }
  52. private void OnConnected(long channelId, SocketError error)
  53. {
  54. if (this.service.GetChannel(channelId) == null)
  55. {
  56. return;
  57. }
  58. if (error != SocketError.Success)
  59. {
  60. Log.Error($"connect error: {error}");
  61. return;
  62. }
  63. this.isConnected = true;
  64. this.StartSend();
  65. this.StartRecv();
  66. }
  67. public override void Send(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  68. {
  69. byte[] size = BitConverter.GetBytes(buffer.Length);
  70. this.sendBuffer.SendTo(size);
  71. this.sendBuffer.SendTo(buffer);
  72. if (!this.isSending && this.isConnected)
  73. {
  74. this.StartSend();
  75. }
  76. }
  77. public override void Send(List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  78. {
  79. int size = buffers.Select(b => b.Length).Sum();
  80. byte[] sizeBuffer = BitConverter.GetBytes(size);
  81. this.sendBuffer.SendTo(sizeBuffer);
  82. foreach (byte[] buffer in buffers)
  83. {
  84. this.sendBuffer.SendTo(buffer);
  85. }
  86. if (!this.isSending && this.isConnected)
  87. {
  88. this.StartSend();
  89. }
  90. }
  91. public override byte[] Recv()
  92. {
  93. if (this.parser.Parse())
  94. {
  95. return this.parser.GetPacket();
  96. }
  97. return null;
  98. }
  99. private void StartSend()
  100. {
  101. // 没有数据需要发送
  102. if (this.sendBuffer.Count == 0)
  103. {
  104. this.isSending = false;
  105. return;
  106. }
  107. this.isSending = true;
  108. int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
  109. if (sendSize > this.sendBuffer.Count)
  110. {
  111. sendSize = this.sendBuffer.Count;
  112. }
  113. if (!this.socket.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize))
  114. {
  115. this.OnSend(sendSize, SocketError.Success);
  116. return;
  117. }
  118. this.socket.OnSend = this.OnSend;
  119. }
  120. private void OnSend(int n, SocketError error)
  121. {
  122. if (this.Id == 0)
  123. {
  124. return;
  125. }
  126. this.socket.OnSend = null;
  127. if (error != SocketError.Success)
  128. {
  129. Log.Info($"socket send fail, error: {error}, n: {n}");
  130. this.OnError(this.Id, error);
  131. return;
  132. }
  133. this.sendBuffer.FirstIndex += n;
  134. if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
  135. {
  136. this.sendBuffer.FirstIndex = 0;
  137. this.sendBuffer.RemoveFirst();
  138. }
  139. this.StartSend();
  140. }
  141. private void StartRecv()
  142. {
  143. int size = TBuffer.ChunkSize - this.recvBuffer.LastIndex;
  144. if (!this.socket.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex, size))
  145. {
  146. this.OnRecv(size, SocketError.Success);
  147. }
  148. this.socket.OnRecv = this.OnRecv;
  149. }
  150. private void OnRecv(int n, SocketError error)
  151. {
  152. if (this.Id == 0)
  153. {
  154. return;
  155. }
  156. this.socket.OnRecv = null;
  157. if (error != SocketError.Success)
  158. {
  159. Log.Info($"socket recv fail, error: {error}, {n}");
  160. this.OnError(this.Id, error);
  161. return;
  162. }
  163. this.recvBuffer.LastIndex += n;
  164. if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
  165. {
  166. this.recvBuffer.AddLast();
  167. this.recvBuffer.LastIndex = 0;
  168. }
  169. StartRecv();
  170. }
  171. }
  172. }