ActorMessageSenderComponentSystem.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. using System;
  2. using System.IO;
  3. namespace ET
  4. {
  5. [FriendClass(typeof(ActorMessageSenderComponent))]
  6. public static class ActorMessageSenderComponentSystem
  7. {
  8. [Timer(TimerType.ActorMessageSenderChecker)]
  9. public class ActorMessageSenderChecker: ATimer<ActorMessageSenderComponent>
  10. {
  11. public override void Run(ActorMessageSenderComponent self)
  12. {
  13. try
  14. {
  15. self.Check();
  16. }
  17. catch (Exception e)
  18. {
  19. Log.Error($"move timer error: {self.Id}\n{e}");
  20. }
  21. }
  22. }
  23. [ObjectSystem]
  24. public class ActorMessageSenderComponentAwakeSystem: AwakeSystem<ActorMessageSenderComponent>
  25. {
  26. public override void Awake(ActorMessageSenderComponent self)
  27. {
  28. ActorMessageSenderComponent.Instance = self;
  29. self.TimeoutCheckTimer = TimerComponent.Instance.NewRepeatedTimer(1000, TimerType.ActorMessageSenderChecker, self);
  30. }
  31. }
  32. [ObjectSystem]
  33. public class ActorMessageSenderComponentDestroySystem: DestroySystem<ActorMessageSenderComponent>
  34. {
  35. public override void Destroy(ActorMessageSenderComponent self)
  36. {
  37. ActorMessageSenderComponent.Instance = null;
  38. TimerComponent.Instance.Remove(ref self.TimeoutCheckTimer);
  39. self.TimeoutCheckTimer = 0;
  40. self.TimeoutActorMessageSenders.Clear();
  41. }
  42. }
  43. public static void Run(ActorMessageSender self, IActorResponse response)
  44. {
  45. if (response.Error == ErrorCore.ERR_ActorTimeout)
  46. {
  47. self.Tcs.SetException(new Exception($"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.MemoryStream.ToActorMessage()}, response: {response}"));
  48. return;
  49. }
  50. if (self.NeedException && ErrorCore.IsRpcNeedThrowException(response.Error))
  51. {
  52. self.Tcs.SetException(new Exception($"Rpc error: actorId: {self.ActorId} request: {self.MemoryStream.ToActorMessage()}, response: {response}"));
  53. return;
  54. }
  55. self.Tcs.SetResult(response);
  56. }
  57. public static void Check(this ActorMessageSenderComponent self)
  58. {
  59. long timeNow = TimeHelper.ServerNow();
  60. foreach ((int key, ActorMessageSender value) in self.requestCallback)
  61. {
  62. // 因为是顺序发送的,所以,检测到第一个不超时的就退出
  63. if (timeNow < value.CreateTime + ActorMessageSenderComponent.TIMEOUT_TIME)
  64. {
  65. break;
  66. }
  67. self.TimeoutActorMessageSenders.Add(key);
  68. }
  69. foreach (int rpcId in self.TimeoutActorMessageSenders)
  70. {
  71. ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
  72. self.requestCallback.Remove(rpcId);
  73. try
  74. {
  75. IActorResponse response = ActorHelper.CreateResponse((IActorRequest)actorMessageSender.MemoryStream.ToActorMessage(), ErrorCore.ERR_ActorTimeout);
  76. Run(actorMessageSender, response);
  77. }
  78. catch (Exception e)
  79. {
  80. Log.Error(e.ToString());
  81. }
  82. }
  83. self.TimeoutActorMessageSenders.Clear();
  84. }
  85. public static void Send(this ActorMessageSenderComponent self, long actorId, IMessage message)
  86. {
  87. if (actorId == 0)
  88. {
  89. throw new Exception($"actor id is 0: {message}");
  90. }
  91. ProcessActorId processActorId = new ProcessActorId(actorId);
  92. Session session = NetInnerComponent.Instance.Get(processActorId.Process);
  93. session.Send(processActorId.ActorId, message);
  94. }
  95. public static void Send(this ActorMessageSenderComponent self, long actorId, MemoryStream memoryStream)
  96. {
  97. if (actorId == 0)
  98. {
  99. throw new Exception($"actor id is 0: {memoryStream.ToActorMessage()}");
  100. }
  101. ProcessActorId processActorId = new ProcessActorId(actorId);
  102. Session session = NetInnerComponent.Instance.Get(processActorId.Process);
  103. session.Send(processActorId.ActorId, memoryStream);
  104. }
  105. public static int GetRpcId(this ActorMessageSenderComponent self)
  106. {
  107. return ++self.RpcId;
  108. }
  109. public static async ETTask<IActorResponse> Call(
  110. this ActorMessageSenderComponent self,
  111. long actorId,
  112. IActorRequest request,
  113. bool needException = true
  114. )
  115. {
  116. request.RpcId = self.GetRpcId();
  117. if (actorId == 0)
  118. {
  119. throw new Exception($"actor id is 0: {request}");
  120. }
  121. (ushort _, MemoryStream stream) = MessageSerializeHelper.MessageToStream(request);
  122. return await self.Call(actorId, request.RpcId, stream, needException);
  123. }
  124. public static async ETTask<IActorResponse> Call(
  125. this ActorMessageSenderComponent self,
  126. long actorId,
  127. int rpcId,
  128. MemoryStream memoryStream,
  129. bool needException = true
  130. )
  131. {
  132. if (actorId == 0)
  133. {
  134. throw new Exception($"actor id is 0: {memoryStream.ToActorMessage()}");
  135. }
  136. var tcs = ETTask<IActorResponse>.Create(true);
  137. self.requestCallback.Add(rpcId, new ActorMessageSender(actorId, memoryStream, tcs, needException));
  138. self.Send(actorId, memoryStream);
  139. long beginTime = TimeHelper.ServerFrameTime();
  140. IActorResponse response = await tcs;
  141. long endTime = TimeHelper.ServerFrameTime();
  142. long costTime = endTime - beginTime;
  143. if (costTime > 200)
  144. {
  145. Log.Warning("actor rpc time > 200: {0} {1}", costTime, memoryStream.ToActorMessage());
  146. }
  147. return response;
  148. }
  149. public static void RunMessage(this ActorMessageSenderComponent self, long actorId, IActorResponse response)
  150. {
  151. ActorMessageSender actorMessageSender;
  152. if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
  153. {
  154. return;
  155. }
  156. self.requestCallback.Remove(response.RpcId);
  157. Run(actorMessageSender, response);
  158. }
  159. }
  160. }