TChannel.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5. using Common.Helper;
  6. using Common.Logger;
  7. using Common.Network;
  8. using MongoDB.Bson;
  9. namespace TNet
  10. {
  11. public class TChannel: AChannel
  12. {
  13. private const int SendInterval = 0;
  14. private TSocket socket;
  15. private readonly TBuffer recvBuffer = new TBuffer();
  16. private readonly TBuffer sendBuffer = new TBuffer();
  17. private ObjectId sendTimer = ObjectId.Empty;
  18. private Action onParseComplete = () => { };
  19. private readonly PacketParser parser;
  20. private readonly string remoteAddress;
  21. private bool isConnected;
  22. public TChannel(TSocket socket, TService service): base(service)
  23. {
  24. this.isConnected = true;
  25. this.socket = socket;
  26. this.service = service;
  27. this.parser = new PacketParser(this.recvBuffer);
  28. this.remoteAddress = this.socket.RemoteAddress;
  29. this.StartRecv();
  30. }
  31. public TChannel(TSocket socket, string host, int port, TService service): base(service)
  32. {
  33. this.socket = socket;
  34. this.service = service;
  35. this.parser = new PacketParser(this.recvBuffer);
  36. this.remoteAddress = host + ":" + port;
  37. }
  38. private void Dispose(bool disposing)
  39. {
  40. if (this.socket == null)
  41. {
  42. return;
  43. }
  44. this.onDispose(this);
  45. if (disposing)
  46. {
  47. // 释放托管的资源
  48. this.socket.Dispose();
  49. }
  50. // 释放非托管资源
  51. this.service.Remove(this);
  52. this.socket = null;
  53. }
  54. ~TChannel()
  55. {
  56. this.Dispose(false);
  57. }
  58. public override void Dispose()
  59. {
  60. this.Dispose(true);
  61. GC.SuppressFinalize(this);
  62. }
  63. public override async Task<bool> ConnectAsync()
  64. {
  65. string[] ss = this.RemoteAddress.Split(':');
  66. int port = int.Parse(ss[1]);
  67. bool result = await this.socket.ConnectAsync(ss[0], port);
  68. this.isConnected = true;
  69. this.SetStartSendFlag();
  70. this.StartRecv();
  71. return result;
  72. }
  73. private void SetStartSendFlag()
  74. {
  75. if (this.sendTimer == ObjectId.Empty)
  76. {
  77. this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
  78. }
  79. }
  80. public override void SendAsync(
  81. byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  82. {
  83. byte[] size = BitConverter.GetBytes(buffer.Length);
  84. this.sendBuffer.SendTo(size);
  85. this.sendBuffer.SendTo(buffer);
  86. if (this.isConnected)
  87. {
  88. this.SetStartSendFlag();
  89. }
  90. }
  91. public override void SendAsync(
  92. List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  93. {
  94. int size = buffers.Select(b => b.Length).Sum();
  95. byte[] sizeBuffer = BitConverter.GetBytes(size);
  96. this.sendBuffer.SendTo(sizeBuffer);
  97. foreach (byte[] buffer in buffers)
  98. {
  99. this.sendBuffer.SendTo(buffer);
  100. }
  101. if (this.isConnected)
  102. {
  103. this.SetStartSendFlag();
  104. }
  105. }
  106. public ObjectId SendTimer
  107. {
  108. get
  109. {
  110. return this.sendTimer;
  111. }
  112. }
  113. public override Task<byte[]> RecvAsync()
  114. {
  115. var tcs = new TaskCompletionSource<byte[]>();
  116. if (this.parser.Parse())
  117. {
  118. tcs.SetResult(this.parser.GetPacket());
  119. }
  120. else
  121. {
  122. this.onParseComplete = () => this.ParseComplete(tcs);
  123. }
  124. return tcs.Task;
  125. }
  126. public override async Task<bool> DisconnnectAsync()
  127. {
  128. return await this.socket.DisconnectAsync();
  129. }
  130. public override string RemoteAddress
  131. {
  132. get
  133. {
  134. return this.remoteAddress;
  135. }
  136. }
  137. private void ParseComplete(TaskCompletionSource<byte[]> tcs)
  138. {
  139. byte[] packet = this.parser.GetPacket();
  140. this.onParseComplete = () => { };
  141. tcs.SetResult(packet);
  142. }
  143. public async void StartSend()
  144. {
  145. try
  146. {
  147. while (true)
  148. {
  149. if (this.sendBuffer.Count == 0)
  150. {
  151. break;
  152. }
  153. int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
  154. if (sendSize > this.sendBuffer.Count)
  155. {
  156. sendSize = this.sendBuffer.Count;
  157. }
  158. int n = await this.socket.SendAsync(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
  159. this.sendBuffer.FirstIndex += n;
  160. if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
  161. {
  162. this.sendBuffer.FirstIndex = 0;
  163. this.sendBuffer.RemoveFirst();
  164. }
  165. }
  166. this.sendTimer = ObjectId.Empty;
  167. }
  168. catch (Exception e)
  169. {
  170. Log.Debug(e.ToString());
  171. }
  172. }
  173. private async void StartRecv()
  174. {
  175. try
  176. {
  177. while (true)
  178. {
  179. int n = await this.socket.RecvAsync(
  180. this.recvBuffer.Last, this.recvBuffer.LastIndex, TBuffer.ChunkSize - this.recvBuffer.LastIndex);
  181. if (n == 0)
  182. {
  183. break;
  184. }
  185. this.recvBuffer.LastIndex += n;
  186. if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
  187. {
  188. this.recvBuffer.AddLast();
  189. this.recvBuffer.LastIndex = 0;
  190. }
  191. // 解析封包
  192. if (this.parser.Parse())
  193. {
  194. this.onParseComplete();
  195. }
  196. }
  197. }
  198. catch (Exception e)
  199. {
  200. Log.Trace(e.ToString());
  201. }
  202. }
  203. }
  204. }