ActorMessageSenderComponentSystem.cs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. using System;
  2. using System.IO;
  3. namespace ET
  4. {
  5. [Timer(TimerType.ActorMessageSenderChecker)]
  6. public class ActorMessageSenderChecker: ATimer<ActorMessageSenderComponent>
  7. {
  8. public override void Run(ActorMessageSenderComponent self)
  9. {
  10. try
  11. {
  12. self.Check();
  13. }
  14. catch (Exception e)
  15. {
  16. Log.Error($"move timer error: {self.Id}\n{e}");
  17. }
  18. }
  19. }
  20. [ObjectSystem]
  21. public class ActorMessageSenderComponentAwakeSystem: AwakeSystem<ActorMessageSenderComponent>
  22. {
  23. public override void Awake(ActorMessageSenderComponent self)
  24. {
  25. ActorMessageSenderComponent.Instance = self;
  26. self.TimeoutCheckTimer = TimerComponent.Instance.NewRepeatedTimer(1000, TimerType.ActorMessageSenderChecker, self);
  27. }
  28. }
  29. [ObjectSystem]
  30. public class ActorMessageSenderComponentDestroySystem: DestroySystem<ActorMessageSenderComponent>
  31. {
  32. public override void Destroy(ActorMessageSenderComponent self)
  33. {
  34. ActorMessageSenderComponent.Instance = null;
  35. TimerComponent.Instance.Remove(ref self.TimeoutCheckTimer);
  36. self.TimeoutCheckTimer = 0;
  37. self.TimeoutActorMessageSenders.Clear();
  38. }
  39. }
  40. public static class ActorMessageSenderComponentSystem
  41. {
  42. public static void Run(ActorMessageSender self, IActorResponse response)
  43. {
  44. if (response.Error == ErrorCore.ERR_ActorTimeout)
  45. {
  46. self.Tcs.SetException(new Exception($"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.MemoryStream.ToActorMessage()}, response: {response}"));
  47. return;
  48. }
  49. if (self.NeedException && ErrorCore.IsRpcNeedThrowException(response.Error))
  50. {
  51. self.Tcs.SetException(new Exception($"Rpc error: actorId: {self.ActorId} request: {self.MemoryStream.ToActorMessage()}, response: {response}"));
  52. return;
  53. }
  54. self.Tcs.SetResult(response);
  55. }
  56. public static void Check(this ActorMessageSenderComponent self)
  57. {
  58. long timeNow = TimeHelper.ServerNow();
  59. foreach ((int key, ActorMessageSender value) in self.requestCallback)
  60. {
  61. // 因为是顺序发送的,所以,检测到第一个不超时的就退出
  62. if (timeNow < value.CreateTime + ActorMessageSenderComponent.TIMEOUT_TIME)
  63. {
  64. break;
  65. }
  66. self.TimeoutActorMessageSenders.Add(key);
  67. }
  68. foreach (int rpcId in self.TimeoutActorMessageSenders)
  69. {
  70. ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
  71. self.requestCallback.Remove(rpcId);
  72. try
  73. {
  74. IActorResponse response = ActorHelper.CreateResponse((IActorRequest)actorMessageSender.MemoryStream.ToActorMessage(), ErrorCore.ERR_ActorTimeout);
  75. Run(actorMessageSender, response);
  76. }
  77. catch (Exception e)
  78. {
  79. Log.Error(e.ToString());
  80. }
  81. }
  82. self.TimeoutActorMessageSenders.Clear();
  83. }
  84. public static void Send(this ActorMessageSenderComponent self, long actorId, IMessage message)
  85. {
  86. if (actorId == 0)
  87. {
  88. throw new Exception($"actor id is 0: {message}");
  89. }
  90. ProcessActorId processActorId = new ProcessActorId(actorId);
  91. Session session = NetInnerComponent.Instance.Get(processActorId.Process);
  92. session.Send(processActorId.ActorId, message);
  93. }
  94. public static void Send(this ActorMessageSenderComponent self, long actorId, MemoryStream memoryStream)
  95. {
  96. if (actorId == 0)
  97. {
  98. throw new Exception($"actor id is 0: {memoryStream.ToActorMessage()}");
  99. }
  100. ProcessActorId processActorId = new ProcessActorId(actorId);
  101. Session session = NetInnerComponent.Instance.Get(processActorId.Process);
  102. session.Send(processActorId.ActorId, memoryStream);
  103. }
  104. public static int GetRpcId(this ActorMessageSenderComponent self)
  105. {
  106. return ++self.RpcId;
  107. }
  108. public static async ETTask<IActorResponse> Call(
  109. this ActorMessageSenderComponent self,
  110. long actorId,
  111. IActorRequest request,
  112. bool needException = true
  113. )
  114. {
  115. request.RpcId = self.GetRpcId();
  116. if (actorId == 0)
  117. {
  118. throw new Exception($"actor id is 0: {request}");
  119. }
  120. (ushort _, MemoryStream stream) = MessageSerializeHelper.MessageToStream(0, request);
  121. return await self.Call(actorId, request.RpcId, stream, needException);
  122. }
  123. public static async ETTask<IActorResponse> Call(
  124. this ActorMessageSenderComponent self,
  125. long actorId,
  126. int rpcId,
  127. MemoryStream memoryStream,
  128. bool needException = true
  129. )
  130. {
  131. if (actorId == 0)
  132. {
  133. throw new Exception($"actor id is 0: {memoryStream.ToActorMessage()}");
  134. }
  135. var tcs = ETTask<IActorResponse>.Create(true);
  136. self.requestCallback.Add(rpcId, new ActorMessageSender(actorId, memoryStream, tcs, needException));
  137. self.Send(actorId, memoryStream);
  138. long beginTime = TimeHelper.ServerFrameTime();
  139. IActorResponse response = await tcs;
  140. long endTime = TimeHelper.ServerFrameTime();
  141. long costTime = endTime - beginTime;
  142. if (costTime > 200)
  143. {
  144. Log.Warning("actor rpc time > 200: {0} {1}", costTime, memoryStream.ToActorMessage());
  145. }
  146. return response;
  147. }
  148. public static void RunMessage(this ActorMessageSenderComponent self, long actorId, IActorResponse response)
  149. {
  150. ActorMessageSender actorMessageSender;
  151. if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
  152. {
  153. return;
  154. }
  155. self.requestCallback.Remove(response.RpcId);
  156. Run(actorMessageSender, response);
  157. }
  158. }
  159. }