Session.cs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace Model
  7. {
  8. public sealed class Session : Entity
  9. {
  10. private static uint RpcId { get; set; }
  11. private readonly NetworkComponent network;
  12. private readonly Dictionary<uint, Action<object>> requestCallback = new Dictionary<uint, Action<object>>();
  13. private readonly AChannel channel;
  14. private bool isRpc;
  15. private readonly IMessagePacker messagePacker;
  16. public Session(NetworkComponent network, AChannel channel, IMessagePacker messagePacker)
  17. {
  18. this.network = network;
  19. this.channel = channel;
  20. this.messagePacker = messagePacker;
  21. this.StartRecv();
  22. }
  23. public string RemoteAddress
  24. {
  25. get
  26. {
  27. return this.channel.RemoteAddress;
  28. }
  29. }
  30. public ChannelType ChannelType
  31. {
  32. get
  33. {
  34. return this.channel.ChannelType;
  35. }
  36. }
  37. private async void StartRecv()
  38. {
  39. TimerComponent timerComponent = Game.Scene.GetComponent<TimerComponent>();
  40. while (true)
  41. {
  42. if (this.Id == 0)
  43. {
  44. return;
  45. }
  46. byte[] messageBytes;
  47. try
  48. {
  49. if (this.isRpc)
  50. {
  51. this.isRpc = false;
  52. await timerComponent.WaitAsync(0);
  53. }
  54. messageBytes = await channel.Recv();
  55. }
  56. catch (Exception e)
  57. {
  58. Log.Error(e.ToString());
  59. continue;
  60. }
  61. if (messageBytes.Length < 3)
  62. {
  63. continue;
  64. }
  65. ushort opcode = BitConverter.ToUInt16(messageBytes, 0);
  66. try
  67. {
  68. this.Run(opcode, messageBytes);
  69. }
  70. catch (Exception e)
  71. {
  72. Log.Error(e.ToString());
  73. }
  74. }
  75. }
  76. private void Run(ushort opcode, byte[] messageBytes)
  77. {
  78. int offset = 0;
  79. byte flag = messageBytes[2];
  80. bool isCompressed = (flag & 0x80) > 0;
  81. const int opcodeAndFlagLength = 3;
  82. if (isCompressed) // 最高位为1,表示有压缩,需要解压缩
  83. {
  84. messageBytes = ZipHelper.Decompress(messageBytes, opcodeAndFlagLength, messageBytes.Length - opcodeAndFlagLength);
  85. offset = 0;
  86. }
  87. else
  88. {
  89. offset = opcodeAndFlagLength;
  90. }
  91. this.RunDecompressedBytes(opcode, messageBytes, offset);
  92. }
  93. private void RunDecompressedBytes(ushort opcode, byte[] messageBytes, int offset)
  94. {
  95. Type messageType = this.network.Owner.GetComponent<OpcodeTypeComponent>().GetType(opcode);
  96. object message = messagePacker.DeserializeFrom(messageType, messageBytes, offset, messageBytes.Length - offset);
  97. // 普通消息或者是Rpc请求消息
  98. if (message is AMessage || message is ARequest)
  99. {
  100. MessageInfo messageInfo = new MessageInfo(opcode, message);
  101. Game.Scene.GetComponent<CrossComponent>().Run(CrossIdType.MessageDeserializeFinish, messageInfo);
  102. return;
  103. }
  104. AResponse response = message as AResponse;
  105. Log.Debug($"aaaaaaaaaaaaaaaaaaaaaaaaaaa {JsonHelper.ToJson(response)}");
  106. if (response != null)
  107. {
  108. // rpcFlag>0 表示这是一个rpc响应消息
  109. // Rpc回调有找不着的可能,因为client可能取消Rpc调用
  110. Action<object> action;
  111. if (!this.requestCallback.TryGetValue(response.RpcId, out action))
  112. {
  113. return;
  114. }
  115. this.requestCallback.Remove(response.RpcId);
  116. action(message);
  117. return;
  118. }
  119. throw new Exception($"message type error: {message.GetType().FullName}");
  120. }
  121. /// <summary>
  122. /// Rpc调用
  123. /// </summary>
  124. public Task<Response> Call<Request, Response>(Request request, CancellationToken cancellationToken) where Request : ARequest
  125. where Response : AResponse
  126. {
  127. request.RpcId = ++RpcId;
  128. this.SendMessage(request);
  129. var tcs = new TaskCompletionSource<Response>();
  130. this.requestCallback[RpcId] = (message) =>
  131. {
  132. try
  133. {
  134. Response response = (Response)message;
  135. if (response.Error != 0)
  136. {
  137. tcs.SetException(new RpcException(response.Error, response.Message));
  138. return;
  139. }
  140. this.isRpc = true;
  141. tcs.SetResult(response);
  142. }
  143. catch (Exception e)
  144. {
  145. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  146. }
  147. };
  148. cancellationToken.Register(() => { this.requestCallback.Remove(RpcId); });
  149. return tcs.Task;
  150. }
  151. /// <summary>
  152. /// Rpc调用,发送一个消息,等待返回一个消息
  153. /// </summary>
  154. public Task<Response> Call<Request, Response>(Request request) where Request : ARequest where Response : AResponse
  155. {
  156. request.RpcId = ++RpcId;
  157. this.SendMessage(request);
  158. var tcs = new TaskCompletionSource<Response>();
  159. this.requestCallback[RpcId] = (message) =>
  160. {
  161. try
  162. {
  163. Response response = (Response)message;
  164. if (response.Error != 0)
  165. {
  166. tcs.SetException(new RpcException(response.Error, response.Message));
  167. return;
  168. }
  169. this.isRpc = true;
  170. tcs.SetResult(response);
  171. }
  172. catch (Exception e)
  173. {
  174. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  175. }
  176. };
  177. return tcs.Task;
  178. }
  179. public void Send<Message>(Message message) where Message : AMessage
  180. {
  181. if (this.Id == 0)
  182. {
  183. throw new Exception("session已经被Dispose了");
  184. }
  185. this.SendMessage(message);
  186. }
  187. public void Reply<Response>(Response message) where Response : AResponse
  188. {
  189. if (this.Id == 0)
  190. {
  191. throw new Exception("session已经被Dispose了");
  192. }
  193. this.SendMessage(message);
  194. }
  195. private void SendMessage(object message)
  196. {
  197. ushort opcode = this.network.Owner.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
  198. byte[] opcodeBytes = BitConverter.GetBytes(opcode);
  199. byte[] messageBytes = messagePacker.SerializeToByteArray(message);
  200. byte flag = 0;
  201. if (messageBytes.Length > 100)
  202. {
  203. byte[] newMessageBytes = ZipHelper.Compress(messageBytes);
  204. if (newMessageBytes.Length < messageBytes.Length)
  205. {
  206. messageBytes = newMessageBytes;
  207. flag |= 0x80;
  208. }
  209. }
  210. byte[] flagBytes = { flag };
  211. channel.Send(new List<byte[]> { opcodeBytes, flagBytes, messageBytes });
  212. }
  213. public override void Dispose()
  214. {
  215. if (this.Id == 0)
  216. {
  217. return;
  218. }
  219. long id = this.Id;
  220. base.Dispose();
  221. this.channel.Dispose();
  222. this.network.Remove(id);
  223. }
  224. }
  225. }