MessageComponent.cs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. namespace Base
  6. {
  7. [ObjectEvent]
  8. public class MessageComponentEvent : ObjectEvent<MessageComponent>, IAwake<MessageHandlerComponent, AChannel>
  9. {
  10. public void Awake(MessageHandlerComponent messageHandler, AChannel aChannel)
  11. {
  12. this.GetValue().Awake(messageHandler, aChannel);
  13. }
  14. }
  15. /// <summary>
  16. /// 消息收发
  17. /// </summary>
  18. public class MessageComponent: Component
  19. {
  20. private uint RpcId { get; set; }
  21. private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
  22. private readonly Dictionary<ushort, Action<byte[], int, int>> waitCallback = new Dictionary<ushort, Action<byte[], int, int>>();
  23. private AChannel channel;
  24. private MessageHandlerComponent messageHandler;
  25. public void Awake(MessageHandlerComponent handler, AChannel aChannel)
  26. {
  27. this.messageHandler = handler;
  28. this.channel = aChannel;
  29. this.UpdateChannel();
  30. }
  31. private async void UpdateChannel()
  32. {
  33. while (true)
  34. {
  35. byte[] messageBytes;
  36. try
  37. {
  38. messageBytes = await channel.Recv();
  39. }
  40. catch (Exception e)
  41. {
  42. Log.Error(e.ToString());
  43. continue;
  44. }
  45. if (messageBytes.Length < 6)
  46. {
  47. continue;
  48. }
  49. ushort opcode = BitConverter.ToUInt16(messageBytes, 0);
  50. try
  51. {
  52. this.Run(opcode, messageBytes);
  53. }
  54. catch (Exception e)
  55. {
  56. Log.Error(e.ToString());
  57. }
  58. }
  59. }
  60. private void Run(ushort opcode, byte[] messageBytes)
  61. {
  62. int offset = 0;
  63. uint flagUInt = BitConverter.ToUInt32(messageBytes, 2);
  64. bool isCompressed = (byte)(flagUInt >> 24) == 1;
  65. if (isCompressed) // 表示有压缩,需要解压缩
  66. {
  67. messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
  68. offset = 0;
  69. }
  70. else
  71. {
  72. offset = 6;
  73. }
  74. uint rpcId = flagUInt & 0x0fff;
  75. this.RunDecompressedBytes(opcode, rpcId, messageBytes, offset);
  76. }
  77. private void RunDecompressedBytes(ushort opcode, uint rpcId, byte[] messageBytes, int offset)
  78. {
  79. Action<byte[], int, int> action;
  80. if (this.requestCallback.TryGetValue(rpcId, out action))
  81. {
  82. this.requestCallback.Remove(rpcId);
  83. action(messageBytes, offset, messageBytes.Length - offset);
  84. return;
  85. }
  86. if (this.waitCallback.TryGetValue(opcode, out action))
  87. {
  88. this.waitCallback.Remove(opcode);
  89. action(messageBytes, offset, messageBytes.Length - offset);
  90. return;
  91. }
  92. this.messageHandler.Handle(this.Owner, opcode, messageBytes, offset);
  93. }
  94. public Task<Response> CallAsync<Response>(object request, CancellationToken cancellationToken) where Response : IErrorMessage
  95. {
  96. this.Send(request, ++this.RpcId);
  97. var tcs = new TaskCompletionSource<Response>();
  98. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  99. {
  100. try
  101. {
  102. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  103. ushort opcode = this.messageHandler.MessageOpcode[request.GetType()];
  104. if (response.ErrorMessage.Errno != 0)
  105. {
  106. tcs.SetException(new RpcException(response.ErrorMessage.Errno, response.ErrorMessage.Message));
  107. return;
  108. }
  109. tcs.SetResult(response);
  110. }
  111. catch (Exception e)
  112. {
  113. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  114. }
  115. };
  116. cancellationToken.Register(() => { this.requestCallback.Remove(this.RpcId); });
  117. return tcs.Task;
  118. }
  119. /// <summary>
  120. /// Rpc调用,发送一个消息,等待返回一个消息
  121. /// </summary>
  122. /// <typeparam name="Response"></typeparam>
  123. /// <param name="request"></param>
  124. /// <returns></returns>
  125. public Task<Response> CallAsync<Response>(object request) where Response : IErrorMessage
  126. {
  127. this.Send(request, ++this.RpcId);
  128. var tcs = new TaskCompletionSource<Response>();
  129. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  130. {
  131. try
  132. {
  133. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  134. if (response.ErrorMessage.Errno != 0)
  135. {
  136. tcs.SetException(new RpcException(response.ErrorMessage.Errno, response.ErrorMessage.Message));
  137. return;
  138. }
  139. tcs.SetResult(response);
  140. }
  141. catch (Exception e)
  142. {
  143. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  144. }
  145. };
  146. return tcs.Task;
  147. }
  148. /// <summary>
  149. /// 不发送消息,直接等待返回一个消息
  150. /// </summary>
  151. /// <typeparam name="Response"></typeparam>
  152. /// <param name="cancellationToken"></param>
  153. /// <returns></returns>
  154. public Task<Response> WaitAsync<Response>(CancellationToken cancellationToken) where Response : class
  155. {
  156. var tcs = new TaskCompletionSource<Response>();
  157. ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
  158. this.waitCallback[opcode] = (bytes, offset, count) =>
  159. {
  160. try
  161. {
  162. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  163. tcs.SetResult(response);
  164. }
  165. catch (Exception e)
  166. {
  167. tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
  168. }
  169. };
  170. cancellationToken.Register(() => { this.waitCallback.Remove(opcode); });
  171. return tcs.Task;
  172. }
  173. /// <summary>
  174. /// 不发送消息,直接等待返回一个消息
  175. /// </summary>
  176. /// <typeparam name="Response"></typeparam>
  177. /// <returns></returns>
  178. public Task<Response> WaitAsync<Response>() where Response : class
  179. {
  180. var tcs = new TaskCompletionSource<Response>();
  181. ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
  182. this.waitCallback[opcode] = (bytes, offset, count) =>
  183. {
  184. try
  185. {
  186. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  187. tcs.SetResult(response);
  188. }
  189. catch (Exception e)
  190. {
  191. tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
  192. }
  193. };
  194. return tcs.Task;
  195. }
  196. public void Send(object message)
  197. {
  198. this.Send(message, 0);
  199. }
  200. private void Send(object message, uint rpcId)
  201. {
  202. ushort opcode = this.messageHandler.MessageOpcode[message.GetType()];
  203. byte[] opcodeBytes = BitConverter.GetBytes(opcode);
  204. byte[] seqBytes = BitConverter.GetBytes(rpcId);
  205. byte[] messageBytes = MongoHelper.ToBson(message);
  206. if (channel == null)
  207. {
  208. throw new Exception("game channel not found!");
  209. }
  210. channel.Send(new List<byte[]> { opcodeBytes, seqBytes, messageBytes });
  211. }
  212. public override void Dispose()
  213. {
  214. if (this.Id == 0)
  215. {
  216. return;
  217. }
  218. base.Dispose();
  219. channel.Dispose();
  220. }
  221. }
  222. }