MessageLocationSenderComponentSystem.cs 12 KB


  1. using System;
  2. using System.IO;
  3. using MongoDB.Bson;
  4. namespace ET.Server
  5. {
  6. [EntitySystemOf(typeof(MessageLocationSenderOneType))]
  7. [FriendOf(typeof(MessageLocationSenderOneType))]
  8. [FriendOf(typeof(MessageLocationSender))]
  9. public static partial class MessageLocationSenderComponentSystem
  10. {
  11. [Invoke(TimerInvokeType.MessageLocationSenderChecker)]
  12. public class MessageLocationSenderChecker: ATimer<MessageLocationSenderOneType>
  13. {
  14. protected override void Run(MessageLocationSenderOneType self)
  15. {
  16. try
  17. {
  18. self.Check();
  19. }
  20. catch (Exception e)
  21. {
  22. Log.Error($"move timer error: {self.Id}\n{e}");
  23. }
  24. }
  25. }
  26. [EntitySystem]
  27. private static void Awake(this MessageLocationSenderOneType self)
  28. {
  29. // 每10s扫描一次过期的actorproxy进行回收,过期时间是2分钟
  30. // 可能由于bug或者进程挂掉,导致ActorLocationSender发送的消息没有确认,结果无法自动删除,每一分钟清理一次这种ActorLocationSender
  31. self.CheckTimer = self.Root().GetComponent<TimerComponent>().NewRepeatedTimer(10 * 1000, TimerInvokeType.MessageLocationSenderChecker, self);
  32. }
  33. [EntitySystem]
  34. private static void Destroy(this MessageLocationSenderOneType self)
  35. {
  36. self.Root().GetComponent<TimerComponent>()?.Remove(ref self.CheckTimer);
  37. }
  38. private static void Check(this MessageLocationSenderOneType self)
  39. {
  40. using ListComponent<long> list = ListComponent<long>.Create();
  41. long timeNow = TimeInfo.Instance.ServerNow();
  42. foreach ((long key, Entity value) in self.Children)
  43. {
  44. MessageLocationSender messageLocationMessageSender = (MessageLocationSender) value;
  45. if (timeNow > messageLocationMessageSender.LastSendOrRecvTime + MessageLocationSenderOneType.TIMEOUT_TIME)
  46. {
  47. list.Add(key);
  48. }
  49. }
  50. foreach (long id in list)
  51. {
  52. self.Remove(id);
  53. }
  54. }
  55. private static MessageLocationSender GetOrCreate(this MessageLocationSenderOneType self, long id)
  56. {
  57. if (id == 0)
  58. {
  59. throw new Exception($"actor id is 0");
  60. }
  61. if (self.Children.TryGetValue(id, out Entity actorLocationSender))
  62. {
  63. return (MessageLocationSender) actorLocationSender;
  64. }
  65. actorLocationSender = self.AddChildWithId<MessageLocationSender>(id);
  66. return (MessageLocationSender) actorLocationSender;
  67. }
  68. // 有需要主动删除actorMessageSender的需求,比如断线重连,玩家登录了不同的Gate,这时候需要通知map删掉之前的actorMessageSender
  69. // 然后重新创建新的,重新请求新的ActorId
  70. public static void Remove(this MessageLocationSenderOneType self, long id)
  71. {
  72. if (!self.Children.TryGetValue(id, out Entity actorMessageSender))
  73. {
  74. return;
  75. }
  76. actorMessageSender.Dispose();
  77. }
  78. // 发给不会改变位置的actorlocation用这个,这种actor消息不会阻塞发送队列,性能更高
  79. // 发送过去找不到actor不会重试,用此方法,你得保证actor提前注册好了location
  80. public static void Send(this MessageLocationSenderOneType self, long entityId, IMessage message)
  81. {
  82. self.SendInner(entityId, message).NoContext();
  83. }
  84. private static async ETTask SendInner(this MessageLocationSenderOneType self, long entityId, IMessage message)
  85. {
  86. MessageLocationSender messageLocationSender = self.GetOrCreate(entityId);
  87. Scene root = self.Root();
  88. long instanceId = messageLocationSender.InstanceId;
  89. using (await root.GetComponent<CoroutineLockComponent>().Wait(CoroutineLockType.MessageLocationSender, entityId))
  90. {
  91. if (messageLocationSender.InstanceId != instanceId)
  92. {
  93. throw new RpcException(ErrorCode.ERR_MessageTimeout, $"{message}");
  94. }
  95. if (messageLocationSender.ActorId == default)
  96. {
  97. messageLocationSender.ActorId = await root.GetComponent<LocationProxyComponent>().Get((int)self.Id, messageLocationSender.Id);
  98. if (messageLocationSender.InstanceId != instanceId)
  99. {
  100. throw new RpcException(ErrorCode.ERR_ActorLocationSenderTimeout2, $"{message}");
  101. }
  102. }
  103. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  104. root.GetComponent<MessageSender>().Send(messageLocationSender.ActorId, message);
  105. }
  106. }
  107. // 发给不会改变位置的actorlocation用这个,这种actor消息不会阻塞发送队列,性能更高,发送过去找不到actor不会重试
  108. // 发送过去找不到actor不会重试,用此方法,你得保证actor提前注册好了location
  109. public static async ETTask<IResponse> Call(this MessageLocationSenderOneType self, long entityId, IRequest request)
  110. {
  111. MessageLocationSender messageLocationSender = self.GetOrCreate(entityId);
  112. Scene root = self.Root();
  113. long instanceId = messageLocationSender.InstanceId;
  114. using (await root.GetComponent<CoroutineLockComponent>().Wait(CoroutineLockType.MessageLocationSender, entityId))
  115. {
  116. if (messageLocationSender.InstanceId != instanceId)
  117. {
  118. throw new RpcException(ErrorCode.ERR_MessageTimeout, $"{request}");
  119. }
  120. if (messageLocationSender.ActorId == default)
  121. {
  122. messageLocationSender.ActorId = await root.GetComponent<LocationProxyComponent>().Get((int)self.Id, messageLocationSender.Id);
  123. if (messageLocationSender.InstanceId != instanceId)
  124. {
  125. throw new RpcException(ErrorCode.ERR_ActorLocationSenderTimeout2, $"{request}");
  126. }
  127. }
  128. }
  129. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  130. return await root.GetComponent<MessageSender>().Call(messageLocationSender.ActorId, request);
  131. }
  132. public static void Send(this MessageLocationSenderOneType self, long entityId, ILocationMessage message)
  133. {
  134. self.Call(entityId, message).NoContext();
  135. }
  136. public static async ETTask<IResponse> Call(this MessageLocationSenderOneType self, long entityId, ILocationRequest iRequest)
  137. {
  138. MessageLocationSender messageLocationSender = self.GetOrCreate(entityId);
  139. Scene root = self.Root();
  140. Type iRequestType = iRequest.GetType();
  141. long actorLocationSenderInstanceId = messageLocationSender.InstanceId;
  142. using (await root.GetComponent<CoroutineLockComponent>().Wait(CoroutineLockType.MessageLocationSender, entityId))
  143. {
  144. if (messageLocationSender.InstanceId != actorLocationSenderInstanceId)
  145. {
  146. throw new RpcException(ErrorCode.ERR_NotFoundActor, $"{iRequest}");
  147. }
  148. try
  149. {
  150. return await self.CallInner(messageLocationSender, iRequest);
  151. }
  152. catch (RpcException)
  153. {
  154. self.Remove(messageLocationSender.Id);
  155. throw;
  156. }
  157. catch (Exception e)
  158. {
  159. self.Remove(messageLocationSender.Id);
  160. throw new Exception($"{iRequestType.FullName}", e);
  161. }
  162. }
  163. }
  164. private static async ETTask<IResponse> CallInner(this MessageLocationSenderOneType self, MessageLocationSender messageLocationSender, IRequest iRequest)
  165. {
  166. int failTimes = 0;
  167. long instanceId = messageLocationSender.InstanceId;
  168. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  169. Scene root = self.Root();
  170. Type requestType = iRequest.GetType();
  171. while (true)
  172. {
  173. if (messageLocationSender.ActorId == default)
  174. {
  175. messageLocationSender.ActorId = await root.GetComponent<LocationProxyComponent>().Get((int)self.Id, messageLocationSender.Id);
  176. if (messageLocationSender.InstanceId != instanceId)
  177. {
  178. throw new RpcException(ErrorCode.ERR_ActorLocationSenderTimeout2, $"{iRequest}");
  179. }
  180. }
  181. if (messageLocationSender.ActorId == default)
  182. {
  183. return MessageHelper.CreateResponse(requestType, 0, ErrorCode.ERR_NotFoundActor);
  184. }
  185. IResponse response = await root.GetComponent<MessageSender>().Call(messageLocationSender.ActorId, iRequest, needException: false);
  186. if (messageLocationSender.InstanceId != instanceId)
  187. {
  188. throw new RpcException(ErrorCode.ERR_ActorLocationSenderTimeout3, $"{requestType.FullName}");
  189. }
  190. switch (response.Error)
  191. {
  192. case ErrorCode.ERR_NotFoundActor:
  193. {
  194. // 如果没找到Actor,重试
  195. ++failTimes;
  196. if (failTimes > 20)
  197. {
  198. Log.Debug($"actor send message fail, actorid: {messageLocationSender.Id} {requestType.FullName}");
  199. // 这里删除actor,后面等待发送的消息会判断InstanceId,InstanceId不一致返回ERR_NotFoundActor
  200. self.Remove(messageLocationSender.Id);
  201. return response;
  202. }
  203. // 等待0.5s再发送
  204. await root.GetComponent<TimerComponent>().WaitAsync(500);
  205. if (messageLocationSender.InstanceId != instanceId)
  206. {
  207. throw new RpcException(ErrorCode.ERR_ActorLocationSenderTimeout4, $"{requestType.FullName}");
  208. }
  209. messageLocationSender.ActorId = default;
  210. continue;
  211. }
  212. case ErrorCode.ERR_MessageTimeout:
  213. {
  214. throw new RpcException(response.Error, $"{requestType.FullName}");
  215. }
  216. }
  217. if (ErrorCode.IsRpcNeedThrowException(response.Error))
  218. {
  219. throw new RpcException(response.Error, $"Message: {response.Message} Request: {requestType.FullName}");
  220. }
  221. return response;
  222. }
  223. }
  224. }
  225. [EntitySystemOf(typeof(MessageLocationSenderComponent))]
  226. [FriendOf(typeof (MessageLocationSenderComponent))]
  227. public static partial class MessageLocationSenderManagerComponentSystem
  228. {
  229. [EntitySystem]
  230. private static void Awake(this MessageLocationSenderComponent self)
  231. {
  232. }
  233. public static MessageLocationSenderOneType Get(this MessageLocationSenderComponent self, int locationType)
  234. {
  235. MessageLocationSenderOneType messageLocationSenderOneType = self.GetChild<MessageLocationSenderOneType>(locationType);
  236. if (messageLocationSenderOneType != null)
  237. {
  238. return messageLocationSenderOneType;
  239. }
  240. messageLocationSenderOneType = self.AddChildWithId<MessageLocationSenderOneType>(locationType);
  241. return messageLocationSenderOneType;
  242. }
  243. }
  244. }