ActorMessageSenderComponentSystem.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. using System;
  2. using System.IO;
  3. namespace ET
  4. {
  5. [ObjectSystem]
  6. public class ActorMessageSenderComponentAwakeSystem: AwakeSystem<ActorMessageSenderComponent>
  7. {
  8. public override void Awake(ActorMessageSenderComponent self)
  9. {
  10. ActorMessageSenderComponent.Instance = self;
  11. self.TimeoutCheckTimer = TimerComponent.Instance.NewRepeatedTimer(1000, self.Check);
  12. }
  13. }
  14. [ObjectSystem]
  15. public class ActorMessageSenderComponentDestroySystem: DestroySystem<ActorMessageSenderComponent>
  16. {
  17. public override void Destroy(ActorMessageSenderComponent self)
  18. {
  19. ActorMessageSenderComponent.Instance = null;
  20. TimerComponent.Instance.Remove(ref self.TimeoutCheckTimer);
  21. self.TimeoutCheckTimer = 0;
  22. self.TimeoutActorMessageSenders.Clear();
  23. }
  24. }
  25. public static class ActorMessageSenderComponentSystem
  26. {
  27. public static void Run(ActorMessageSender self, IActorResponse response)
  28. {
  29. if (response.Error == ErrorCode.ERR_ActorTimeout)
  30. {
  31. self.Tcs.SetException(new Exception($"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.MemoryStream.ToActorMessage()}, response: {response}"));
  32. return;
  33. }
  34. if (self.NeedException && ErrorCode.IsRpcNeedThrowException(response.Error))
  35. {
  36. self.Tcs.SetException(new Exception($"Rpc error: actorId: {self.ActorId} request: {self.MemoryStream.ToActorMessage()}, response: {response}"));
  37. return;
  38. }
  39. self.Tcs.SetResult(response);
  40. }
  41. public static void Check(this ActorMessageSenderComponent self)
  42. {
  43. long timeNow = TimeHelper.ServerNow();
  44. foreach ((int key, ActorMessageSender value) in self.requestCallback)
  45. {
  46. // 因为是顺序发送的,所以,检测到第一个不超时的就退出
  47. if (timeNow < value.CreateTime + ActorMessageSenderComponent.TIMEOUT_TIME)
  48. {
  49. break;
  50. }
  51. self.TimeoutActorMessageSenders.Add(key);
  52. }
  53. foreach (int rpcId in self.TimeoutActorMessageSenders)
  54. {
  55. ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
  56. self.requestCallback.Remove(rpcId);
  57. try
  58. {
  59. IActorResponse response = ActorHelper.CreateResponse((IActorRequest)actorMessageSender.MemoryStream.ToActorMessage(), ErrorCode.ERR_ActorTimeout);
  60. Run(actorMessageSender, response);
  61. }
  62. catch (Exception e)
  63. {
  64. Log.Error(e.ToString());
  65. }
  66. }
  67. self.TimeoutActorMessageSenders.Clear();
  68. }
  69. public static void Send(this ActorMessageSenderComponent self, long actorId, IMessage message)
  70. {
  71. if (actorId == 0)
  72. {
  73. throw new Exception($"actor id is 0: {message}");
  74. }
  75. ProcessActorId processActorId = new ProcessActorId(actorId);
  76. Session session = NetInnerComponent.Instance.Get(processActorId.Process);
  77. session.Send(processActorId.ActorId, message);
  78. }
  79. public static void Send(this ActorMessageSenderComponent self, long actorId, MemoryStream memoryStream)
  80. {
  81. if (actorId == 0)
  82. {
  83. throw new Exception($"actor id is 0: {memoryStream.ToActorMessage()}");
  84. }
  85. ProcessActorId processActorId = new ProcessActorId(actorId);
  86. Session session = NetInnerComponent.Instance.Get(processActorId.Process);
  87. session.Send(processActorId.ActorId, memoryStream);
  88. }
  89. public static int GetRpcId(this ActorMessageSenderComponent self)
  90. {
  91. return ++self.RpcId;
  92. }
  93. public static async ETTask<IActorResponse> Call(
  94. this ActorMessageSenderComponent self,
  95. long actorId,
  96. IActorRequest request,
  97. bool needException = true
  98. )
  99. {
  100. request.RpcId = self.GetRpcId();
  101. if (actorId == 0)
  102. {
  103. throw new Exception($"actor id is 0: {request}");
  104. }
  105. (ushort _, MemoryStream stream) = MessageSerializeHelper.MessageToStream(0, request);
  106. return await self.Call(actorId, request.RpcId, stream, needException);
  107. }
  108. public static async ETTask<IActorResponse> Call(
  109. this ActorMessageSenderComponent self,
  110. long actorId,
  111. int rpcId,
  112. MemoryStream memoryStream,
  113. bool needException = true
  114. )
  115. {
  116. if (actorId == 0)
  117. {
  118. throw new Exception($"actor id is 0: {memoryStream.ToActorMessage()}");
  119. }
  120. var tcs = new ETTaskCompletionSource<IActorResponse>();
  121. self.requestCallback.Add(rpcId, new ActorMessageSender(actorId, memoryStream, tcs, needException));
  122. self.Send(actorId, memoryStream);
  123. long beginTime = TimeHelper.ServerFrameTime();
  124. IActorResponse response = await tcs.Task;
  125. long endTime = TimeHelper.ServerFrameTime();
  126. long costTime = endTime - beginTime;
  127. if (costTime > 200)
  128. {
  129. Log.Warning("actor rpc time > 200: {0} {1}", costTime, memoryStream.ToActorMessage());
  130. }
  131. return response;
  132. }
  133. public static void RunMessage(this ActorMessageSenderComponent self, long actorId, IActorResponse response)
  134. {
  135. ActorMessageSender actorMessageSender;
  136. if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
  137. {
  138. return;
  139. }
  140. self.requestCallback.Remove(response.RpcId);
  141. Run(actorMessageSender, response);
  142. }
  143. }
  144. }