MessageComponent.cs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using Base;
  6. namespace Model
  7. {
  8. [ObjectEvent]
  9. public class MessageComponentEvent : ObjectEvent<MessageComponent>, IAwake<AChannel>
  10. {
  11. public void Awake(AChannel aChannel)
  12. {
  13. this.GetValue().Awake(aChannel);
  14. }
  15. }
  16. /// <summary>
  17. /// 消息收发
  18. /// </summary>
  19. public class MessageComponent: Component
  20. {
  21. private uint RpcId { get; set; }
  22. private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
  23. private readonly Dictionary<ushort, Action<byte[], int, int>> waitCallback = new Dictionary<ushort, Action<byte[], int, int>>();
  24. private AChannel channel;
  25. private MessageHandlerComponent messageHandler;
  26. public void Awake(AChannel aChannel)
  27. {
  28. this.messageHandler = Game.Scene.GetComponent<MessageHandlerComponent>();
  29. this.channel = aChannel;
  30. this.StartRecv();
  31. }
  32. public string RemoteAddress
  33. {
  34. get
  35. {
  36. return this.channel.RemoteAddress;
  37. }
  38. }
  39. private async void StartRecv()
  40. {
  41. while (true)
  42. {
  43. if (this.Id == 0)
  44. {
  45. return;
  46. }
  47. byte[] messageBytes;
  48. try
  49. {
  50. messageBytes = await channel.Recv();
  51. }
  52. catch (Exception e)
  53. {
  54. Log.Error(e.ToString());
  55. continue;
  56. }
  57. if (messageBytes.Length < 6)
  58. {
  59. continue;
  60. }
  61. ushort opcode = BitConverter.ToUInt16(messageBytes, 0);
  62. try
  63. {
  64. this.Run(opcode, messageBytes);
  65. }
  66. catch (Exception e)
  67. {
  68. Log.Error(e.ToString());
  69. }
  70. }
  71. }
  72. private void Run(ushort opcode, byte[] messageBytes)
  73. {
  74. int offset = 0;
  75. uint flagUInt = BitConverter.ToUInt32(messageBytes, 2);
  76. bool isCompressed = (byte)(flagUInt >> 24) == 1;
  77. if (isCompressed) // 表示有压缩,需要解压缩
  78. {
  79. messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
  80. offset = 0;
  81. }
  82. else
  83. {
  84. offset = 6;
  85. }
  86. uint rpcId = flagUInt & 0x0fff;
  87. this.RunDecompressedBytes(opcode, rpcId, messageBytes, offset);
  88. }
  89. private void RunDecompressedBytes(ushort opcode, uint rpcId, byte[] messageBytes, int offset)
  90. {
  91. Action<byte[], int, int> action;
  92. if (this.requestCallback.TryGetValue(rpcId, out action))
  93. {
  94. this.requestCallback.Remove(rpcId);
  95. action(messageBytes, offset, messageBytes.Length - offset);
  96. return;
  97. }
  98. if (this.waitCallback.TryGetValue(opcode, out action))
  99. {
  100. this.waitCallback.Remove(opcode);
  101. action(messageBytes, offset, messageBytes.Length - offset);
  102. return;
  103. }
  104. this.messageHandler.Handle(this.Owner, opcode, messageBytes, offset);
  105. }
  106. public Task<Response> CallAsync<Response>(object request, CancellationToken cancellationToken) where Response : IErrorMessage
  107. {
  108. this.Send(request, ++this.RpcId);
  109. var tcs = new TaskCompletionSource<Response>();
  110. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  111. {
  112. try
  113. {
  114. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  115. if (response.ErrorMessage.Errno != 0)
  116. {
  117. tcs.SetException(new RpcException(response.ErrorMessage.Errno, response.ErrorMessage.Message));
  118. return;
  119. }
  120. tcs.SetResult(response);
  121. }
  122. catch (Exception e)
  123. {
  124. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  125. }
  126. };
  127. cancellationToken.Register(() => { this.requestCallback.Remove(this.RpcId); });
  128. return tcs.Task;
  129. }
  130. /// <summary>
  131. /// Rpc调用,发送一个消息,等待返回一个消息
  132. /// </summary>
  133. /// <typeparam name="Response"></typeparam>
  134. /// <param name="request"></param>
  135. /// <returns></returns>
  136. public Task<Response> CallAsync<Response>(object request) where Response : IErrorMessage
  137. {
  138. this.Send(request, ++this.RpcId);
  139. var tcs = new TaskCompletionSource<Response>();
  140. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  141. {
  142. try
  143. {
  144. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  145. if (response.ErrorMessage.Errno != 0)
  146. {
  147. tcs.SetException(new RpcException(response.ErrorMessage.Errno, response.ErrorMessage.Message));
  148. return;
  149. }
  150. tcs.SetResult(response);
  151. }
  152. catch (Exception e)
  153. {
  154. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  155. }
  156. };
  157. return tcs.Task;
  158. }
  159. /// <summary>
  160. /// 不发送消息,直接等待返回一个消息
  161. /// </summary>
  162. /// <typeparam name="Response"></typeparam>
  163. /// <param name="cancellationToken"></param>
  164. /// <returns></returns>
  165. public Task<Response> WaitAsync<Response>(CancellationToken cancellationToken) where Response : class
  166. {
  167. var tcs = new TaskCompletionSource<Response>();
  168. ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
  169. this.waitCallback[opcode] = (bytes, offset, count) =>
  170. {
  171. try
  172. {
  173. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  174. tcs.SetResult(response);
  175. }
  176. catch (Exception e)
  177. {
  178. tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
  179. }
  180. };
  181. cancellationToken.Register(() => { this.waitCallback.Remove(opcode); });
  182. return tcs.Task;
  183. }
  184. /// <summary>
  185. /// 不发送消息,直接等待返回一个消息
  186. /// </summary>
  187. /// <typeparam name="Response"></typeparam>
  188. /// <returns></returns>
  189. public Task<Response> WaitAsync<Response>() where Response : class
  190. {
  191. var tcs = new TaskCompletionSource<Response>();
  192. ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
  193. this.waitCallback[opcode] = (bytes, offset, count) =>
  194. {
  195. try
  196. {
  197. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  198. tcs.SetResult(response);
  199. }
  200. catch (Exception e)
  201. {
  202. tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
  203. }
  204. };
  205. return tcs.Task;
  206. }
  207. public void Send(object message)
  208. {
  209. this.Send(message, 0);
  210. }
  211. private void Send(object message, uint rpcId)
  212. {
  213. ushort opcode = this.messageHandler.MessageOpcode[message.GetType()];
  214. byte[] opcodeBytes = BitConverter.GetBytes(opcode);
  215. byte[] seqBytes = BitConverter.GetBytes(rpcId);
  216. byte[] messageBytes = MongoHelper.ToBson(message);
  217. if (channel == null)
  218. {
  219. throw new Exception("game channel not found!");
  220. }
  221. channel.Send(new List<byte[]> { opcodeBytes, seqBytes, messageBytes });
  222. }
  223. public override void Dispose()
  224. {
  225. if (this.Id == 0)
  226. {
  227. return;
  228. }
  229. base.Dispose();
  230. channel.Dispose();
  231. }
  232. }
  233. }