MessageComponent.cs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using Base;
  6. namespace Model
  7. {
  8. [ObjectEvent]
  9. public class MessageComponentEvent : ObjectEvent<MessageComponent>, IAwake<MessageHandlerComponent, AChannel>
  10. {
  11. public void Awake(MessageHandlerComponent messageHandler, AChannel aChannel)
  12. {
  13. this.GetValue().Awake(messageHandler, aChannel);
  14. }
  15. }
  16. /// <summary>
  17. /// 消息收发
  18. /// </summary>
  19. public class MessageComponent: Component
  20. {
  21. private uint RpcId { get; set; }
  22. private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
  23. private readonly Dictionary<ushort, Action<byte[], int, int>> waitCallback = new Dictionary<ushort, Action<byte[], int, int>>();
  24. private AChannel channel;
  25. private MessageHandlerComponent messageHandler;
  26. public void Awake(MessageHandlerComponent handler, AChannel aChannel)
  27. {
  28. this.messageHandler = handler;
  29. this.channel = aChannel;
  30. this.UpdateChannel();
  31. }
  32. private async void UpdateChannel()
  33. {
  34. while (true)
  35. {
  36. byte[] messageBytes;
  37. try
  38. {
  39. messageBytes = await channel.Recv();
  40. }
  41. catch (Exception e)
  42. {
  43. Log.Error(e.ToString());
  44. continue;
  45. }
  46. if (messageBytes.Length < 6)
  47. {
  48. continue;
  49. }
  50. ushort opcode = BitConverter.ToUInt16(messageBytes, 0);
  51. try
  52. {
  53. this.Run(opcode, messageBytes);
  54. }
  55. catch (Exception e)
  56. {
  57. Log.Error(e.ToString());
  58. }
  59. }
  60. }
  61. private void Run(ushort opcode, byte[] messageBytes)
  62. {
  63. int offset = 0;
  64. uint flagUInt = BitConverter.ToUInt32(messageBytes, 2);
  65. bool isCompressed = (byte)(flagUInt >> 24) == 1;
  66. if (isCompressed) // 表示有压缩,需要解压缩
  67. {
  68. messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
  69. offset = 0;
  70. }
  71. else
  72. {
  73. offset = 6;
  74. }
  75. uint rpcId = flagUInt & 0x0fff;
  76. this.RunDecompressedBytes(opcode, rpcId, messageBytes, offset);
  77. }
  78. private void RunDecompressedBytes(ushort opcode, uint rpcId, byte[] messageBytes, int offset)
  79. {
  80. Action<byte[], int, int> action;
  81. if (this.requestCallback.TryGetValue(rpcId, out action))
  82. {
  83. this.requestCallback.Remove(rpcId);
  84. action(messageBytes, offset, messageBytes.Length - offset);
  85. return;
  86. }
  87. if (this.waitCallback.TryGetValue(opcode, out action))
  88. {
  89. this.waitCallback.Remove(opcode);
  90. action(messageBytes, offset, messageBytes.Length - offset);
  91. return;
  92. }
  93. this.messageHandler.Handle(this.Owner, opcode, messageBytes, offset);
  94. }
  95. public Task<Response> CallAsync<Response>(object request, CancellationToken cancellationToken) where Response : IErrorMessage
  96. {
  97. this.Send(request, ++this.RpcId);
  98. var tcs = new TaskCompletionSource<Response>();
  99. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  100. {
  101. try
  102. {
  103. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  104. ushort opcode = this.messageHandler.MessageOpcode[request.GetType()];
  105. if (response.ErrorMessage.errno != 0)
  106. {
  107. tcs.SetException(new RpcException(response.ErrorMessage.errno, response.ErrorMessage.msg.Utf8ToStr()));
  108. return;
  109. }
  110. tcs.SetResult(response);
  111. }
  112. catch (Exception e)
  113. {
  114. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  115. }
  116. };
  117. cancellationToken.Register(() => { this.requestCallback.Remove(this.RpcId); });
  118. return tcs.Task;
  119. }
  120. /// <summary>
  121. /// Rpc调用,发送一个消息,等待返回一个消息
  122. /// </summary>
  123. /// <typeparam name="Response"></typeparam>
  124. /// <param name="request"></param>
  125. /// <returns></returns>
  126. public Task<Response> CallAsync<Response>(object request) where Response : IErrorMessage
  127. {
  128. this.Send(request, ++this.RpcId);
  129. var tcs = new TaskCompletionSource<Response>();
  130. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  131. {
  132. try
  133. {
  134. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  135. if (response.ErrorMessage.errno != 0)
  136. {
  137. tcs.SetException(new RpcException(response.ErrorMessage.errno, response.ErrorMessage.msg.Utf8ToStr()));
  138. return;
  139. }
  140. tcs.SetResult(response);
  141. }
  142. catch (Exception e)
  143. {
  144. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  145. }
  146. };
  147. return tcs.Task;
  148. }
  149. /// <summary>
  150. /// 不发送消息,直接等待返回一个消息
  151. /// </summary>
  152. /// <typeparam name="Response"></typeparam>
  153. /// <param name="cancellationToken"></param>
  154. /// <returns></returns>
  155. public Task<Response> WaitAsync<Response>(CancellationToken cancellationToken) where Response : class
  156. {
  157. var tcs = new TaskCompletionSource<Response>();
  158. ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
  159. this.waitCallback[opcode] = (bytes, offset, count) =>
  160. {
  161. try
  162. {
  163. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  164. tcs.SetResult(response);
  165. }
  166. catch (Exception e)
  167. {
  168. tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
  169. }
  170. };
  171. cancellationToken.Register(() => { this.waitCallback.Remove(opcode); });
  172. return tcs.Task;
  173. }
  174. /// <summary>
  175. /// 不发送消息,直接等待返回一个消息
  176. /// </summary>
  177. /// <typeparam name="Response"></typeparam>
  178. /// <returns></returns>
  179. public Task<Response> WaitAsync<Response>() where Response : class
  180. {
  181. var tcs = new TaskCompletionSource<Response>();
  182. ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
  183. this.waitCallback[opcode] = (bytes, offset, count) =>
  184. {
  185. try
  186. {
  187. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  188. tcs.SetResult(response);
  189. }
  190. catch (Exception e)
  191. {
  192. tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
  193. }
  194. };
  195. return tcs.Task;
  196. }
  197. public void Send(object message)
  198. {
  199. this.Send(message, 0);
  200. }
  201. private void Send(object message, uint rpcId)
  202. {
  203. ushort opcode = this.messageHandler.MessageOpcode[message.GetType()];
  204. byte[] opcodeBytes = BitConverter.GetBytes(opcode);
  205. byte[] seqBytes = BitConverter.GetBytes(rpcId);
  206. byte[] messageBytes = MongoHelper.ToBson(message);
  207. if (channel == null)
  208. {
  209. throw new Exception("game channel not found!");
  210. }
  211. channel.Send(new List<byte[]> { opcodeBytes, seqBytes, messageBytes });
  212. }
  213. public override void Dispose()
  214. {
  215. if (this.Id == 0)
  216. {
  217. return;
  218. }
  219. base.Dispose();
  220. channel.Dispose();
  221. }
  222. }
  223. }