MessageComponent.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Reflection;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace Base
  8. {
  9. public enum NetChannelType
  10. {
  11. Login,
  12. Gate,
  13. Battle,
  14. }
  15. [ObjectEvent]
  16. public class MessageComponentEvent : ObjectEvent<MessageComponent>, ILoader, IAwake<SceneType>, IUpdate
  17. {
  18. public void Load()
  19. {
  20. this.GetValue().Load();
  21. }
  22. public void Awake(SceneType sceneType)
  23. {
  24. this.GetValue().Awake(sceneType);
  25. }
  26. public void Update()
  27. {
  28. this.GetValue().Update();
  29. }
  30. }
  31. /// <summary>
  32. /// 消息分发组件
  33. /// </summary>
  34. public class MessageComponent: Component
  35. {
  36. private SceneType SceneType;
  37. private uint RpcId { get; set; }
  38. private Dictionary<Opcode, List<Action<byte[], int, int>>> events;
  39. private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
  40. private readonly Dictionary<Opcode, Action<byte[], int, int>> waitCallback = new Dictionary<Opcode, Action<byte[], int, int>>();
  41. private readonly Dictionary<NetChannelType, AChannel> channels = new Dictionary<NetChannelType, AChannel>();
  42. public void Awake(SceneType sceneType)
  43. {
  44. this.SceneType = sceneType;
  45. this.Load();
  46. }
  47. public void Load()
  48. {
  49. this.events = new Dictionary<Opcode, List<Action<byte[], int, int>>>();
  50. Assembly[] assemblies = Object.ObjectManager.GetAssemblies();
  51. foreach (Assembly assembly in assemblies)
  52. {
  53. Type[] types = assembly.GetTypes();
  54. foreach (Type type in types)
  55. {
  56. object[] attrs = type.GetCustomAttributes(typeof(MessageAttribute), false);
  57. if (attrs.Length == 0)
  58. {
  59. continue;
  60. }
  61. MessageAttribute messageAttribute = (MessageAttribute)attrs[0];
  62. if (messageAttribute.SceneType != this.SceneType)
  63. {
  64. continue;
  65. }
  66. object obj = Activator.CreateInstance(type);
  67. IMRegister<MessageComponent> iMRegister = obj as IMRegister<MessageComponent>;
  68. if (iMRegister == null)
  69. {
  70. throw new GameException($"message handler not inherit IEventSync or IEventAsync interface: {obj.GetType().FullName}");
  71. }
  72. iMRegister.Register(this);
  73. }
  74. }
  75. }
  76. public void Register<T>(Action<Entity, T> action)
  77. {
  78. Opcode opcode = EnumHelper.FromString<Opcode>(typeof (T).Name);
  79. if (!this.events.ContainsKey(opcode))
  80. {
  81. this.events.Add(opcode, new List<Action<byte[], int, int>>());
  82. }
  83. List<Action<byte[], int, int>> actions = this.events[opcode];
  84. actions.Add((messageBytes, offset, count) =>
  85. {
  86. T t;
  87. try
  88. {
  89. t = MongoHelper.FromBson<T>(messageBytes, offset, count);
  90. }
  91. catch (Exception ex)
  92. {
  93. throw new GameException("解释消息失败:" + opcode, ex);
  94. }
  95. if (OpcodeHelper.IsNeedDebugLogMessage(opcode))
  96. {
  97. Log.Debug(MongoHelper.ToJson(t));
  98. }
  99. action(this.Owner, t);
  100. });
  101. }
  102. public void Connect(NetChannelType channelType, string host, int port)
  103. {
  104. AChannel channel = Share.Scene.GetComponent<NetworkComponent>().ConnectChannel(host, port);
  105. this.channels[channelType] = channel;
  106. }
  107. public void Close(NetChannelType channelType)
  108. {
  109. AChannel channel = this.GetChannel(channelType);
  110. if (channel == null || channel.Id == 0)
  111. {
  112. return;
  113. }
  114. this.channels.Remove(channelType);
  115. channel.Dispose();
  116. }
  117. public void Update()
  118. {
  119. foreach (AChannel channel in this.channels.Values.ToArray())
  120. {
  121. this.UpdateChannel(channel);
  122. }
  123. }
  124. private void UpdateChannel(AChannel channel)
  125. {
  126. if (channel.Id == 0)
  127. {
  128. return;
  129. }
  130. while (true)
  131. {
  132. byte[] messageBytes = channel.Recv();
  133. if (messageBytes == null)
  134. {
  135. return;
  136. }
  137. if (messageBytes.Length < 6)
  138. {
  139. continue;
  140. }
  141. Opcode opcode = (Opcode)BitConverter.ToUInt16(messageBytes, 0);
  142. try
  143. {
  144. this.Run(opcode, messageBytes);
  145. }
  146. catch (Exception e)
  147. {
  148. Log.Error(e.ToString());
  149. }
  150. }
  151. }
  152. public void Run(Opcode opcode, byte[] messageBytes)
  153. {
  154. int offset = 0;
  155. uint flagUInt = BitConverter.ToUInt32(messageBytes, 2);
  156. bool isCompressed = (byte)(flagUInt >> 24) == 1;
  157. if (isCompressed) // 表示有压缩,需要解压缩
  158. {
  159. messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
  160. offset = 0;
  161. }
  162. else
  163. {
  164. offset = 6;
  165. }
  166. uint rpcId = flagUInt & 0x0fff;
  167. this.RunDecompressedBytes(opcode, rpcId, messageBytes, offset);
  168. }
  169. public void RunDecompressedBytes(Opcode opcode, uint rpcId, byte[] messageBytes, int offset)
  170. {
  171. Action<byte[], int, int> action;
  172. if (this.requestCallback.TryGetValue(rpcId, out action))
  173. {
  174. this.requestCallback.Remove(rpcId);
  175. action(messageBytes, offset, messageBytes.Length - offset);
  176. return;
  177. }
  178. if (this.waitCallback.TryGetValue(opcode, out action))
  179. {
  180. this.waitCallback.Remove(opcode);
  181. action(messageBytes, offset, messageBytes.Length - offset);
  182. return;
  183. }
  184. List<Action<byte[], int, int>> actions;
  185. if (!this.events.TryGetValue(opcode, out actions))
  186. {
  187. if (this.SceneType == SceneType.Game)
  188. {
  189. Log.Error($"消息{opcode}没有处理");
  190. }
  191. return;
  192. }
  193. foreach (var ev in actions)
  194. {
  195. try
  196. {
  197. ev(messageBytes, offset, messageBytes.Length - offset);
  198. }
  199. catch (Exception e)
  200. {
  201. Log.Error(e.ToString());
  202. }
  203. }
  204. }
  205. public Task<Response> CallAsync<Response>(object request, CancellationToken cancellationToken) where Response : IErrorMessage
  206. {
  207. this.Send(request, ++this.RpcId);
  208. var tcs = new TaskCompletionSource<Response>();
  209. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  210. {
  211. try
  212. {
  213. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  214. Opcode opcode = EnumHelper.FromString<Opcode>(response.GetType().Name);
  215. if (OpcodeHelper.IsNeedDebugLogMessage(opcode))
  216. {
  217. Log.Debug(MongoHelper.ToJson(response));
  218. }
  219. if (response.ErrorMessage.errno != (int)ErrorCode.ERR_Success)
  220. {
  221. tcs.SetException(new RpcException((ErrorCode)response.ErrorMessage.errno, response.ErrorMessage.msg.Utf8ToStr()));
  222. return;
  223. }
  224. tcs.SetResult(response);
  225. }
  226. catch (Exception e)
  227. {
  228. tcs.SetException(new GameException($"Rpc Error: {typeof(Response).FullName}", e));
  229. }
  230. };
  231. cancellationToken.Register(() => { this.requestCallback.Remove(this.RpcId); });
  232. return tcs.Task;
  233. }
  234. /// <summary>
  235. /// Rpc调用,发送一个消息,等待返回一个消息
  236. /// </summary>
  237. /// <typeparam name="Response"></typeparam>
  238. /// <param name="request"></param>
  239. /// <returns></returns>
  240. public Task<Response> CallAsync<Response>(object request) where Response : IErrorMessage
  241. {
  242. this.Send(request, ++this.RpcId);
  243. var tcs = new TaskCompletionSource<Response>();
  244. this.requestCallback[this.RpcId] = (bytes, offset, count) =>
  245. {
  246. try
  247. {
  248. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  249. Opcode opcode = EnumHelper.FromString<Opcode>(response.GetType().Name);
  250. if (OpcodeHelper.IsNeedDebugLogMessage(opcode))
  251. {
  252. Log.Debug(MongoHelper.ToJson(response));
  253. }
  254. if (response.ErrorMessage.errno != (int) ErrorCode.ERR_Success)
  255. {
  256. tcs.SetException(new RpcException((ErrorCode)response.ErrorMessage.errno, response.ErrorMessage.msg.Utf8ToStr()));
  257. return;
  258. }
  259. tcs.SetResult(response);
  260. }
  261. catch (Exception e)
  262. {
  263. tcs.SetException(new GameException($"Rpc Error: {typeof(Response).FullName}", e));
  264. }
  265. };
  266. return tcs.Task;
  267. }
  268. /// <summary>
  269. /// 不发送消息,直接等待返回一个消息
  270. /// </summary>
  271. /// <typeparam name="Response"></typeparam>
  272. /// <param name="cancellationToken"></param>
  273. /// <returns></returns>
  274. public Task<Response> WaitAsync<Response>(CancellationToken cancellationToken) where Response : class
  275. {
  276. var tcs = new TaskCompletionSource<Response>();
  277. Opcode opcode = EnumHelper.FromString<Opcode>(typeof(Response).Name);
  278. this.waitCallback[opcode] = (bytes, offset, count) =>
  279. {
  280. try
  281. {
  282. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  283. Opcode op = EnumHelper.FromString<Opcode>(response.GetType().Name);
  284. if (OpcodeHelper.IsNeedDebugLogMessage(op))
  285. {
  286. Log.Debug(MongoHelper.ToJson(response));
  287. }
  288. tcs.SetResult(response);
  289. }
  290. catch (Exception e)
  291. {
  292. tcs.SetException(new GameException($"Wait Error: {typeof(Response).FullName}", e));
  293. }
  294. };
  295. cancellationToken.Register(() => { this.waitCallback.Remove(opcode); });
  296. return tcs.Task;
  297. }
  298. /// <summary>
  299. /// 不发送消息,直接等待返回一个消息
  300. /// </summary>
  301. /// <typeparam name="Response"></typeparam>
  302. /// <returns></returns>
  303. public Task<Response> WaitAsync<Response>() where Response : class
  304. {
  305. var tcs = new TaskCompletionSource<Response>();
  306. Opcode opcode = EnumHelper.FromString<Opcode>(typeof(Response).Name);
  307. this.waitCallback[opcode] = (bytes, offset, count) =>
  308. {
  309. try
  310. {
  311. Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
  312. Opcode op = EnumHelper.FromString<Opcode>(response.GetType().Name);
  313. if (OpcodeHelper.IsNeedDebugLogMessage(op))
  314. {
  315. Log.Debug(MongoHelper.ToJson(response));
  316. }
  317. tcs.SetResult(response);
  318. }
  319. catch (Exception e)
  320. {
  321. tcs.SetException(new GameException($"Wait Error: {typeof(Response).FullName}", e));
  322. }
  323. };
  324. return tcs.Task;
  325. }
  326. public AChannel GetChannel(NetChannelType channelType)
  327. {
  328. AChannel channel;
  329. this.channels.TryGetValue(channelType, out channel);
  330. return channel;
  331. }
  332. public void Send(object message)
  333. {
  334. this.Send(message, 0);
  335. }
  336. public bool IsChannelConnected(NetChannelType channelType)
  337. {
  338. AChannel channel = GetChannel(channelType);
  339. if (channel == null)
  340. {
  341. return false;
  342. }
  343. return true;
  344. }
  345. private void Send(object message, uint rpcId)
  346. {
  347. Opcode opcode = EnumHelper.FromString<Opcode>(message.GetType().Name);
  348. byte[] opcodeBytes = BitConverter.GetBytes((ushort)opcode);
  349. byte[] seqBytes = BitConverter.GetBytes(rpcId);
  350. byte[] messageBytes = MongoHelper.ToBson(message);
  351. NetChannelType channelType;
  352. if ((ushort)opcode > 7000 && (ushort)opcode < 8000)
  353. {
  354. channelType = NetChannelType.Login;
  355. }
  356. else if ((ushort)opcode > 0 && (ushort)opcode <= 1000)
  357. {
  358. channelType = NetChannelType.Battle;
  359. }
  360. else
  361. {
  362. channelType = NetChannelType.Gate;
  363. }
  364. AChannel channel = this.GetChannel(channelType);
  365. if (channel == null)
  366. {
  367. throw new GameException("game channel not found!");
  368. }
  369. channel.Send(new List<byte[]> { opcodeBytes, seqBytes, messageBytes });
  370. if (OpcodeHelper.IsNeedDebugLogMessage(opcode))
  371. {
  372. Log.Debug(MongoHelper.ToJson(message));
  373. }
  374. }
  375. public override void Dispose()
  376. {
  377. if (this.Id == 0)
  378. {
  379. return;
  380. }
  381. base.Dispose();
  382. foreach (AChannel channel in this.channels.Values.ToArray())
  383. {
  384. channel.Dispose();
  385. }
  386. }
  387. }
  388. }