TChannel.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using Common.Helper;
  6. using Common.Logger;
  7. using Common.Network;
  8. using MongoDB.Bson;
  9. namespace TNet
  10. {
  11. internal class TChannel: AChannel
  12. {
  13. private const int SendInterval = 0;
  14. private TSocket socket;
  15. private readonly TBuffer recvBuffer = new TBuffer();
  16. private readonly TBuffer sendBuffer = new TBuffer();
  17. private ObjectId sendTimer = ObjectId.Empty;
  18. private Action onParseComplete = () => { };
  19. private readonly PacketParser parser;
  20. private readonly string remoteAddress;
  21. public TChannel(TSocket socket, TService service): base(service)
  22. {
  23. this.socket = socket;
  24. this.service = service;
  25. this.parser = new PacketParser(this.recvBuffer);
  26. this.remoteAddress = this.socket.RemoteAddress;
  27. this.StartRecv();
  28. }
  29. private void Dispose(bool disposing)
  30. {
  31. if (this.socket == null)
  32. {
  33. return;
  34. }
  35. this.onDispose(this);
  36. if (disposing)
  37. {
  38. // 释放托管的资源
  39. this.socket.Dispose();
  40. }
  41. // 释放非托管资源
  42. this.service.Remove(this);
  43. this.socket = null;
  44. }
  45. ~TChannel()
  46. {
  47. this.Dispose(false);
  48. }
  49. public override void Dispose()
  50. {
  51. this.Dispose(true);
  52. GC.SuppressFinalize(this);
  53. }
  54. public override void SendAsync(
  55. byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  56. {
  57. byte[] size = BitConverter.GetBytes(buffer.Length);
  58. this.sendBuffer.SendTo(size);
  59. this.sendBuffer.SendTo(buffer);
  60. if (this.sendTimer == ObjectId.Empty)
  61. {
  62. this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
  63. }
  64. }
  65. public override void SendAsync(
  66. List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  67. {
  68. int size = buffers.Select(b => b.Length).Sum();
  69. byte[] sizeBuffer = BitConverter.GetBytes(size);
  70. this.sendBuffer.SendTo(sizeBuffer);
  71. foreach (byte[] buffer in buffers)
  72. {
  73. this.sendBuffer.SendTo(buffer);
  74. }
  75. if (this.sendTimer == ObjectId.Empty)
  76. {
  77. this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
  78. }
  79. }
  80. public ObjectId SendTimer
  81. {
  82. get
  83. {
  84. return this.sendTimer;
  85. }
  86. }
  87. public override Task<byte[]> RecvAsync()
  88. {
  89. var tcs = new TaskCompletionSource<byte[]>();
  90. if (this.parser.Parse())
  91. {
  92. tcs.SetResult(this.parser.GetPacket());
  93. }
  94. else
  95. {
  96. this.onParseComplete = () => this.ParseComplete(tcs);
  97. }
  98. return tcs.Task;
  99. }
  100. public override async Task<bool> DisconnnectAsync()
  101. {
  102. return await this.socket.DisconnectAsync();
  103. }
  104. public override string RemoteAddress
  105. {
  106. get
  107. {
  108. return this.remoteAddress;
  109. }
  110. }
  111. private void ParseComplete(TaskCompletionSource<byte[]> tcs)
  112. {
  113. byte[] packet = this.parser.GetPacket();
  114. this.onParseComplete = () => { };
  115. tcs.SetResult(packet);
  116. }
  117. private async void StartSend()
  118. {
  119. try
  120. {
  121. while (true)
  122. {
  123. if (this.sendBuffer.Count == 0)
  124. {
  125. break;
  126. }
  127. int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
  128. if (sendSize > this.sendBuffer.Count)
  129. {
  130. sendSize = this.sendBuffer.Count;
  131. }
  132. int n =
  133. await this.socket.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
  134. this.sendBuffer.FirstIndex += n;
  135. if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
  136. {
  137. this.sendBuffer.FirstIndex = 0;
  138. this.sendBuffer.RemoveFirst();
  139. }
  140. }
  141. }
  142. catch (Exception e)
  143. {
  144. Log.Debug(e.ToString());
  145. }
  146. this.sendTimer = ObjectId.Empty;
  147. }
  148. private async void StartRecv()
  149. {
  150. try
  151. {
  152. while (true)
  153. {
  154. int n =
  155. await
  156. this.socket.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex,
  157. TBuffer.ChunkSize - this.recvBuffer.LastIndex);
  158. if (n == 0)
  159. {
  160. break;
  161. }
  162. this.recvBuffer.LastIndex += n;
  163. if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
  164. {
  165. this.recvBuffer.AddLast();
  166. this.recvBuffer.LastIndex = 0;
  167. }
  168. // 解析封包
  169. if (this.parser.Parse())
  170. {
  171. this.onParseComplete();
  172. }
  173. }
  174. }
  175. catch (Exception e)
  176. {
  177. Log.Trace(e.ToString());
  178. }
  179. }
  180. }
  181. }