TChannel.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. using System;
  2. using System.Threading.Tasks;
  3. using Common.Helper;
  4. using Common.Logger;
  5. using Common.Network;
  6. using MongoDB.Bson;
  7. namespace TNet
  8. {
  9. internal class TChannel: AChannel
  10. {
  11. private const int SendInterval = 0;
  12. private readonly TService service;
  13. private TSocket socket;
  14. private readonly TBuffer recvBuffer = new TBuffer();
  15. private readonly TBuffer sendBuffer = new TBuffer();
  16. private ObjectId sendTimer = ObjectId.Empty;
  17. private Action onParseComplete = () => { };
  18. private readonly PacketParser parser;
  19. private readonly string remoteAddress;
  20. public TChannel(TSocket socket, TService service)
  21. {
  22. this.socket = socket;
  23. this.service = service;
  24. this.parser = new PacketParser(this.recvBuffer);
  25. this.remoteAddress = this.socket.RemoteAddress;
  26. this.StartRecv();
  27. }
  28. private void Dispose(bool disposing)
  29. {
  30. if (this.socket == null)
  31. {
  32. return;
  33. }
  34. if (disposing)
  35. {
  36. // 释放托管的资源
  37. this.socket.Dispose();
  38. }
  39. // 释放非托管资源
  40. this.service.Remove(this);
  41. this.socket = null;
  42. }
  43. ~TChannel()
  44. {
  45. this.Dispose(false);
  46. }
  47. public override void Dispose()
  48. {
  49. this.Dispose(true);
  50. GC.SuppressFinalize(this);
  51. }
  52. public override void SendAsync(
  53. byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  54. {
  55. byte[] size = BitConverter.GetBytes(buffer.Length);
  56. this.sendBuffer.SendTo(size);
  57. this.sendBuffer.SendTo(buffer);
  58. if (this.sendTimer == ObjectId.Empty)
  59. {
  60. this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
  61. }
  62. }
  63. public ObjectId SendTimer
  64. {
  65. get
  66. {
  67. return this.sendTimer;
  68. }
  69. }
  70. public override Task<byte[]> RecvAsync()
  71. {
  72. var tcs = new TaskCompletionSource<byte[]>();
  73. if (this.parser.Parse())
  74. {
  75. tcs.SetResult(this.parser.GetPacket());
  76. }
  77. else
  78. {
  79. this.onParseComplete = () => this.ParseComplete(tcs);
  80. }
  81. return tcs.Task;
  82. }
  83. public override async Task<bool> DisconnnectAsync()
  84. {
  85. return await this.socket.DisconnectAsync();
  86. }
  87. public override string RemoteAddress
  88. {
  89. get
  90. {
  91. return this.remoteAddress;
  92. }
  93. }
  94. private void ParseComplete(TaskCompletionSource<byte[]> tcs)
  95. {
  96. byte[] packet = this.parser.GetPacket();
  97. this.onParseComplete = () => { };
  98. tcs.SetResult(packet);
  99. }
  100. private async void StartSend()
  101. {
  102. try
  103. {
  104. while (true)
  105. {
  106. if (this.sendBuffer.Count == 0)
  107. {
  108. break;
  109. }
  110. int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
  111. if (sendSize > this.sendBuffer.Count)
  112. {
  113. sendSize = this.sendBuffer.Count;
  114. }
  115. int n =
  116. await this.socket.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
  117. this.sendBuffer.FirstIndex += n;
  118. if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
  119. {
  120. this.sendBuffer.FirstIndex = 0;
  121. this.sendBuffer.RemoveFirst();
  122. }
  123. }
  124. }
  125. catch (Exception e)
  126. {
  127. Log.Debug(e.ToString());
  128. }
  129. this.sendTimer = ObjectId.Empty;
  130. }
  131. private async void StartRecv()
  132. {
  133. try
  134. {
  135. while (true)
  136. {
  137. int n =
  138. await
  139. this.socket.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex,
  140. TBuffer.ChunkSize - this.recvBuffer.LastIndex);
  141. if (n == 0)
  142. {
  143. break;
  144. }
  145. this.recvBuffer.LastIndex += n;
  146. if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
  147. {
  148. this.recvBuffer.AddLast();
  149. this.recvBuffer.LastIndex = 0;
  150. }
  151. // 解析封包
  152. if (this.parser.Parse())
  153. {
  154. this.onParseComplete();
  155. }
  156. }
  157. }
  158. catch (Exception e)
  159. {
  160. Log.Trace(e.ToString());
  161. }
  162. }
  163. }
  164. }