TChannel.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. using System;
  2. using System.Threading.Tasks;
  3. using Common.Helper;
  4. using Common.Logger;
  5. using MongoDB.Bson;
  6. using Common.Network;
  7. namespace TNet
  8. {
  9. internal class TChannel: AChannel
  10. {
  11. private const int SendInterval = 0;
  12. private readonly TService service;
  13. private TSocket socket;
  14. private readonly TBuffer recvBuffer = new TBuffer();
  15. private readonly TBuffer sendBuffer = new TBuffer();
  16. private ObjectId sendTimer = ObjectId.Empty;
  17. private Action onParseComplete = () => { };
  18. private readonly PacketParser parser;
  19. private readonly string remoteAddress;
  20. public TChannel(TSocket socket, TService service)
  21. {
  22. this.socket = socket;
  23. this.service = service;
  24. this.parser = new PacketParser(this.recvBuffer);
  25. this.remoteAddress = this.socket.RemoteAddress;
  26. this.StartRecv();
  27. }
  28. protected virtual void Dispose(bool disposing)
  29. {
  30. if (this.socket == null)
  31. {
  32. return;
  33. }
  34. if (disposing)
  35. {
  36. // 释放托管的资源
  37. this.socket.Dispose();
  38. }
  39. // 释放非托管资源
  40. this.service.Remove(this);
  41. this.socket = null;
  42. }
  43. ~TChannel()
  44. {
  45. this.Dispose(false);
  46. }
  47. public override void Dispose()
  48. {
  49. this.Dispose(true);
  50. GC.SuppressFinalize(this);
  51. }
  52. public override void SendAsync(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  53. {
  54. byte[] size = BitConverter.GetBytes(buffer.Length);
  55. this.sendBuffer.SendTo(size);
  56. this.sendBuffer.SendTo(buffer);
  57. if (this.sendTimer == ObjectId.Empty)
  58. {
  59. this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
  60. }
  61. }
  62. public ObjectId SendTimer
  63. {
  64. get
  65. {
  66. return this.sendTimer;
  67. }
  68. }
  69. public override Task<byte[]> RecvAsync()
  70. {
  71. var tcs = new TaskCompletionSource<byte[]>();
  72. if (this.parser.Parse())
  73. {
  74. tcs.SetResult(this.parser.GetPacket());
  75. }
  76. else
  77. {
  78. this.onParseComplete = () => this.ParseComplete(tcs);
  79. }
  80. return tcs.Task;
  81. }
  82. public override async Task<bool> DisconnnectAsync()
  83. {
  84. return await this.socket.DisconnectAsync();
  85. }
  86. public override string RemoteAddress
  87. {
  88. get
  89. {
  90. return this.remoteAddress;
  91. }
  92. }
  93. private void ParseComplete(TaskCompletionSource<byte[]> tcs)
  94. {
  95. byte[] packet = this.parser.GetPacket();
  96. this.onParseComplete = () => { };
  97. tcs.SetResult(packet);
  98. }
  99. private async void StartSend()
  100. {
  101. try
  102. {
  103. while (true)
  104. {
  105. if (this.sendBuffer.Count == 0)
  106. {
  107. break;
  108. }
  109. int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
  110. if (sendSize > this.sendBuffer.Count)
  111. {
  112. sendSize = this.sendBuffer.Count;
  113. }
  114. int n = await this.socket.SendAsync(
  115. this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
  116. this.sendBuffer.FirstIndex += n;
  117. if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
  118. {
  119. this.sendBuffer.FirstIndex = 0;
  120. this.sendBuffer.RemoveFirst();
  121. }
  122. }
  123. }
  124. catch (Exception e)
  125. {
  126. Log.Debug(e.ToString());
  127. }
  128. this.sendTimer = ObjectId.Empty;
  129. }
  130. private async void StartRecv()
  131. {
  132. try
  133. {
  134. while (true)
  135. {
  136. int n = await this.socket.RecvAsync(
  137. this.recvBuffer.Last, this.recvBuffer.LastIndex,
  138. TBuffer.ChunkSize - this.recvBuffer.LastIndex);
  139. if (n == 0)
  140. {
  141. break;
  142. }
  143. this.recvBuffer.LastIndex += n;
  144. if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
  145. {
  146. this.recvBuffer.AddLast();
  147. this.recvBuffer.LastIndex = 0;
  148. }
  149. // 解析封包
  150. if (this.parser.Parse())
  151. {
  152. this.onParseComplete();
  153. }
  154. }
  155. }
  156. catch (Exception e)
  157. {
  158. Log.Trace(e.ToString());
  159. }
  160. }
  161. }
  162. }