TChannel.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using Common.Helper;
  6. using Common.Logger;
  7. using Common.Network;
  8. using MongoDB.Bson;
  9. namespace TNet
  10. {
  11. public class TChannel: AChannel
  12. {
  13. private const int SendInterval = 0;
  14. private TSocket socket;
  15. private readonly TBuffer recvBuffer = new TBuffer();
  16. private readonly TBuffer sendBuffer = new TBuffer();
  17. private ObjectId sendTimer = ObjectId.Empty;
  18. private Action onParseComplete = () => { };
  19. private readonly PacketParser parser;
  20. private readonly string remoteAddress;
  21. private bool isConnected;
  22. public TChannel(TSocket socket, TService service): base(service)
  23. {
  24. this.isConnected = true;
  25. this.socket = socket;
  26. this.service = service;
  27. this.parser = new PacketParser(this.recvBuffer);
  28. this.remoteAddress = this.socket.RemoteAddress;
  29. this.StartRecv();
  30. }
  31. public TChannel(TSocket socket, string host, int port, TService service)
  32. : base(service)
  33. {
  34. this.socket = socket;
  35. this.service = service;
  36. this.parser = new PacketParser(this.recvBuffer);
  37. this.remoteAddress = host + ":" + port;
  38. }
  39. private void Dispose(bool disposing)
  40. {
  41. if (this.socket == null)
  42. {
  43. return;
  44. }
  45. this.onDispose(this);
  46. if (disposing)
  47. {
  48. // 释放托管的资源
  49. this.socket.Dispose();
  50. }
  51. // 释放非托管资源
  52. this.service.Remove(this);
  53. this.socket = null;
  54. }
  55. ~TChannel()
  56. {
  57. this.Dispose(false);
  58. }
  59. public override void Dispose()
  60. {
  61. this.Dispose(true);
  62. GC.SuppressFinalize(this);
  63. }
  64. public override async Task<bool> ConnectAsync()
  65. {
  66. string[] ss = this.RemoteAddress.Split(':');
  67. int port = int.Parse(ss[1]);
  68. bool result = await this.socket.ConnectAsync(ss[0], port);
  69. this.isConnected = true;
  70. this.SetStartSendFlag();
  71. this.StartRecv();
  72. return result;
  73. }
  74. private void SetStartSendFlag()
  75. {
  76. if (this.sendTimer == ObjectId.Empty)
  77. {
  78. this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
  79. }
  80. }
  81. public override void SendAsync(
  82. byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  83. {
  84. byte[] size = BitConverter.GetBytes(buffer.Length);
  85. this.sendBuffer.SendTo(size);
  86. this.sendBuffer.SendTo(buffer);
  87. if (this.isConnected)
  88. {
  89. this.SetStartSendFlag();
  90. }
  91. }
  92. public override void SendAsync(
  93. List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  94. {
  95. int size = buffers.Select(b => b.Length).Sum();
  96. byte[] sizeBuffer = BitConverter.GetBytes(size);
  97. this.sendBuffer.SendTo(sizeBuffer);
  98. foreach (byte[] buffer in buffers)
  99. {
  100. this.sendBuffer.SendTo(buffer);
  101. }
  102. if (this.isConnected)
  103. {
  104. this.SetStartSendFlag();
  105. }
  106. }
  107. public ObjectId SendTimer
  108. {
  109. get
  110. {
  111. return this.sendTimer;
  112. }
  113. }
  114. public override Task<byte[]> RecvAsync()
  115. {
  116. var tcs = new TaskCompletionSource<byte[]>();
  117. if (this.parser.Parse())
  118. {
  119. tcs.SetResult(this.parser.GetPacket());
  120. }
  121. else
  122. {
  123. this.onParseComplete = () => this.ParseComplete(tcs);
  124. }
  125. return tcs.Task;
  126. }
  127. public override async Task<bool> DisconnnectAsync()
  128. {
  129. return await this.socket.DisconnectAsync();
  130. }
  131. public override string RemoteAddress
  132. {
  133. get
  134. {
  135. return this.remoteAddress;
  136. }
  137. }
  138. private void ParseComplete(TaskCompletionSource<byte[]> tcs)
  139. {
  140. byte[] packet = this.parser.GetPacket();
  141. this.onParseComplete = () => { };
  142. tcs.SetResult(packet);
  143. }
  144. public async void StartSend()
  145. {
  146. try
  147. {
  148. while (true)
  149. {
  150. if (this.sendBuffer.Count == 0)
  151. {
  152. break;
  153. }
  154. int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
  155. if (sendSize > this.sendBuffer.Count)
  156. {
  157. sendSize = this.sendBuffer.Count;
  158. }
  159. int n =
  160. await this.socket.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
  161. this.sendBuffer.FirstIndex += n;
  162. if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
  163. {
  164. this.sendBuffer.FirstIndex = 0;
  165. this.sendBuffer.RemoveFirst();
  166. }
  167. }
  168. }
  169. catch (Exception e)
  170. {
  171. Log.Debug(e.ToString());
  172. }
  173. this.sendTimer = ObjectId.Empty;
  174. }
  175. private async void StartRecv()
  176. {
  177. try
  178. {
  179. while (true)
  180. {
  181. int n = await this.socket.RecvAsync(
  182. this.recvBuffer.Last, this.recvBuffer.LastIndex,
  183. TBuffer.ChunkSize - this.recvBuffer.LastIndex);
  184. if (n == 0)
  185. {
  186. break;
  187. }
  188. this.recvBuffer.LastIndex += n;
  189. if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
  190. {
  191. this.recvBuffer.AddLast();
  192. this.recvBuffer.LastIndex = 0;
  193. }
  194. // 解析封包
  195. if (this.parser.Parse())
  196. {
  197. this.onParseComplete();
  198. }
  199. }
  200. }
  201. catch (Exception e)
  202. {
  203. Log.Trace(e.ToString());
  204. }
  205. }
  206. }
  207. }