MessageLocationSenderComponentSystem.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. using System;
  2. using System.IO;
  3. using MongoDB.Bson;
  4. namespace ET.Server
  5. {
  6. [EntitySystemOf(typeof(MessageLocationSenderOneType))]
  7. [FriendOf(typeof(MessageLocationSenderOneType))]
  8. [FriendOf(typeof(MessageLocationSender))]
  9. public static partial class MessageLocationSenderComponentSystem
  10. {
  11. [Invoke(TimerInvokeType.MessageLocationSenderChecker)]
  12. public class MessageLocationSenderChecker: ATimer<MessageLocationSenderOneType>
  13. {
  14. protected override void Run(MessageLocationSenderOneType self)
  15. {
  16. try
  17. {
  18. self.Check();
  19. }
  20. catch (Exception e)
  21. {
  22. Log.Error($"move timer error: {self.Id}\n{e}");
  23. }
  24. }
  25. }
  26. [EntitySystem]
  27. private static void Awake(this MessageLocationSenderOneType self)
  28. {
  29. // 每10s扫描一次过期的actorproxy进行回收,过期时间是2分钟
  30. // 可能由于bug或者进程挂掉,导致ActorLocationSender发送的消息没有确认,结果无法自动删除,每一分钟清理一次这种ActorLocationSender
  31. self.CheckTimer = self.Root().GetComponent<TimerComponent>().NewRepeatedTimer(10 * 1000, TimerInvokeType.MessageLocationSenderChecker, self);
  32. }
  33. [EntitySystem]
  34. private static void Destroy(this MessageLocationSenderOneType self)
  35. {
  36. self.Root().GetComponent<TimerComponent>()?.Remove(ref self.CheckTimer);
  37. }
  38. private static void Check(this MessageLocationSenderOneType self)
  39. {
  40. using (ListComponent<long> list = ListComponent<long>.Create())
  41. {
  42. long timeNow = TimeInfo.Instance.ServerNow();
  43. foreach ((long key, Entity value) in self.Children)
  44. {
  45. MessageLocationSender messageLocationMessageSender = (MessageLocationSender) value;
  46. if (timeNow > messageLocationMessageSender.LastSendOrRecvTime + MessageLocationSenderOneType.TIMEOUT_TIME)
  47. {
  48. list.Add(key);
  49. }
  50. }
  51. foreach (long id in list)
  52. {
  53. self.Remove(id);
  54. }
  55. }
  56. }
  57. private static MessageLocationSender GetOrCreate(this MessageLocationSenderOneType self, long id)
  58. {
  59. if (id == 0)
  60. {
  61. throw new Exception($"actor id is 0");
  62. }
  63. if (self.Children.TryGetValue(id, out Entity actorLocationSender))
  64. {
  65. return (MessageLocationSender) actorLocationSender;
  66. }
  67. actorLocationSender = self.AddChildWithId<MessageLocationSender>(id);
  68. return (MessageLocationSender) actorLocationSender;
  69. }
  70. // 有需要主动删除actorMessageSender的需求,比如断线重连,玩家登录了不同的Gate,这时候需要通知map删掉之前的actorMessageSender
  71. // 然后重新创建新的,重新请求新的ActorId
  72. public static void Remove(this MessageLocationSenderOneType self, long id)
  73. {
  74. if (!self.Children.TryGetValue(id, out Entity actorMessageSender))
  75. {
  76. return;
  77. }
  78. actorMessageSender.Dispose();
  79. }
  80. // 发给不会改变位置的actorlocation用这个,这种actor消息不会阻塞发送队列,性能更高
  81. // 发送过去找不到actor不会重试,用此方法,你得保证actor提前注册好了location
  82. public static async ETTask Send(this MessageLocationSenderOneType self, long entityId, IMessage message)
  83. {
  84. await self.SendInner(entityId, message);
  85. }
  86. private static async ETTask SendInner(this MessageLocationSenderOneType self, long entityId, IMessage message)
  87. {
  88. MessageLocationSender messageLocationSender = self.GetOrCreate(entityId);
  89. Scene root = self.Root();
  90. if (messageLocationSender.ActorId != default)
  91. {
  92. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  93. root.GetComponent<MessageSender>().Send(messageLocationSender.ActorId, message);
  94. return;
  95. }
  96. long instanceId = messageLocationSender.InstanceId;
  97. long coroutineLockType = (self.Id << 32) | CoroutineLockType.MessageLocationSender;
  98. using (await root.Root().GetComponent<CoroutineLockComponent>().Wait(coroutineLockType, entityId))
  99. {
  100. if (messageLocationSender.InstanceId != instanceId)
  101. {
  102. throw new RpcException(ErrorCore.ERR_MessageTimeout, $"{message}");
  103. }
  104. if (messageLocationSender.ActorId == default)
  105. {
  106. messageLocationSender.ActorId = await root.GetComponent<LocationProxyComponent>().Get((int)self.Id, messageLocationSender.Id);
  107. if (messageLocationSender.InstanceId != instanceId)
  108. {
  109. throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout2, $"{message}");
  110. }
  111. }
  112. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  113. root.GetComponent<MessageSender>().Send(messageLocationSender.ActorId, message);
  114. }
  115. }
  116. // 发给不会改变位置的actorlocation用这个,这种actor消息不会阻塞发送队列,性能更高,发送过去找不到actor不会重试
  117. // 发送过去找不到actor不会重试,用此方法,你得保证actor提前注册好了location
  118. public static async ETTask<IResponse> Call(this MessageLocationSenderOneType self, long entityId, IRequest request)
  119. {
  120. MessageLocationSender messageLocationSender = self.GetOrCreate(entityId);
  121. Scene root = self.Root();
  122. if (messageLocationSender.ActorId != default)
  123. {
  124. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  125. return await root.GetComponent<MessageSender>().Call(messageLocationSender.ActorId, request);
  126. }
  127. long instanceId = messageLocationSender.InstanceId;
  128. long coroutineLockType = (self.Id << 32) | CoroutineLockType.MessageLocationSender;
  129. using (await root.GetComponent<CoroutineLockComponent>().Wait(coroutineLockType, entityId))
  130. {
  131. if (messageLocationSender.InstanceId != instanceId)
  132. {
  133. throw new RpcException(ErrorCore.ERR_MessageTimeout, $"{request}");
  134. }
  135. if (messageLocationSender.ActorId == default)
  136. {
  137. messageLocationSender.ActorId = await root.GetComponent<LocationProxyComponent>().Get((int)self.Id, messageLocationSender.Id);
  138. if (messageLocationSender.InstanceId != instanceId)
  139. {
  140. throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout2, $"{request}");
  141. }
  142. }
  143. }
  144. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  145. return await root.GetComponent<MessageSender>().Call(messageLocationSender.ActorId, request);
  146. }
  147. public static void Send(this MessageLocationSenderOneType self, long entityId, ILocationMessage message)
  148. {
  149. self.Call(entityId, message).NoContext();
  150. }
  151. public static async ETTask<IResponse> Call(this MessageLocationSenderOneType self, long entityId, ILocationRequest iRequest)
  152. {
  153. MessageLocationSender messageLocationSender = self.GetOrCreate(entityId);
  154. Scene root = self.Root();
  155. Type iRequestType = iRequest.GetType();
  156. long actorLocationSenderInstanceId = messageLocationSender.InstanceId;
  157. long coroutineLockType = (self.Id << 32) | CoroutineLockType.MessageLocationSender;
  158. using (await root.GetComponent<CoroutineLockComponent>().Wait(coroutineLockType, entityId))
  159. {
  160. if (messageLocationSender.InstanceId != actorLocationSenderInstanceId)
  161. {
  162. throw new RpcException(ErrorCore.ERR_NotFoundActor, $"{iRequest}");
  163. }
  164. try
  165. {
  166. return await self.CallInner(messageLocationSender, iRequest);
  167. }
  168. catch (RpcException)
  169. {
  170. self.Remove(messageLocationSender.Id);
  171. throw;
  172. }
  173. catch (Exception e)
  174. {
  175. self.Remove(messageLocationSender.Id);
  176. throw new Exception($"{iRequestType.FullName}", e);
  177. }
  178. }
  179. }
  180. private static async ETTask<IResponse> CallInner(this MessageLocationSenderOneType self, MessageLocationSender messageLocationSender, IRequest iRequest)
  181. {
  182. int failTimes = 0;
  183. long instanceId = messageLocationSender.InstanceId;
  184. messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
  185. Scene root = self.Root();
  186. Type requestType = iRequest.GetType();
  187. while (true)
  188. {
  189. if (messageLocationSender.ActorId == default)
  190. {
  191. messageLocationSender.ActorId = await root.GetComponent<LocationProxyComponent>().Get((int)self.Id, messageLocationSender.Id);
  192. if (messageLocationSender.InstanceId != instanceId)
  193. {
  194. throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout2, $"{iRequest}");
  195. }
  196. }
  197. if (messageLocationSender.ActorId == default)
  198. {
  199. return MessageHelper.CreateResponse(requestType, 0, ErrorCore.ERR_NotFoundActor);
  200. }
  201. IResponse response = await root.GetComponent<MessageSender>().Call(messageLocationSender.ActorId, iRequest, needException: false);
  202. if (messageLocationSender.InstanceId != instanceId)
  203. {
  204. throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout3, $"{requestType.FullName}");
  205. }
  206. switch (response.Error)
  207. {
  208. case ErrorCore.ERR_NotFoundActor:
  209. {
  210. // 如果没找到Actor,重试
  211. ++failTimes;
  212. if (failTimes > 20)
  213. {
  214. Log.Debug($"actor send message fail, actorid: {messageLocationSender.Id} {requestType.FullName}");
  215. // 这里删除actor,后面等待发送的消息会判断InstanceId,InstanceId不一致返回ERR_NotFoundActor
  216. self.Remove(messageLocationSender.Id);
  217. return response;
  218. }
  219. // 等待0.5s再发送
  220. await root.GetComponent<TimerComponent>().WaitAsync(500);
  221. if (messageLocationSender.InstanceId != instanceId)
  222. {
  223. throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout4, $"{requestType.FullName}");
  224. }
  225. messageLocationSender.ActorId = default;
  226. continue;
  227. }
  228. case ErrorCore.ERR_MessageTimeout:
  229. {
  230. throw new RpcException(response.Error, $"{requestType.FullName}");
  231. }
  232. }
  233. if (ErrorCore.IsRpcNeedThrowException(response.Error))
  234. {
  235. throw new RpcException(response.Error, $"Message: {response.Message} Request: {requestType.FullName}");
  236. }
  237. return response;
  238. }
  239. }
  240. }
  241. [EntitySystemOf(typeof(MessageLocationSenderComponent))]
  242. [FriendOf(typeof (MessageLocationSenderComponent))]
  243. public static partial class MessageLocationSenderManagerComponentSystem
  244. {
  245. [EntitySystem]
  246. private static void Awake(this MessageLocationSenderComponent self)
  247. {
  248. }
  249. public static MessageLocationSenderOneType Get(this MessageLocationSenderComponent self, int locationType)
  250. {
  251. MessageLocationSenderOneType messageLocationSenderOneType = self.GetChild<MessageLocationSenderOneType>(locationType);
  252. if (messageLocationSenderOneType != null)
  253. {
  254. return messageLocationSenderOneType;
  255. }
  256. messageLocationSenderOneType = self.AddChildWithId<MessageLocationSenderOneType>(locationType);
  257. return messageLocationSenderOneType;
  258. }
  259. }
  260. }