ActorMessageSenderComponentSystem.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. using System;
  2. namespace ET
  3. {
  4. public class ActorMessageSenderComponentAwakeSystem : AwakeSystem<ActorMessageSenderComponent>
  5. {
  6. public override void Awake(ActorMessageSenderComponent self)
  7. {
  8. ActorMessageSenderComponent.Instance = self;
  9. self.TimeoutCheckTimer = TimerComponent.Instance.NewRepeatedTimer(10 * 1000, self.Check);
  10. }
  11. }
  12. public class ActorMessageSenderComponentDestroySystem: DestroySystem<ActorMessageSenderComponent>
  13. {
  14. public override void Destroy(ActorMessageSenderComponent self)
  15. {
  16. ActorMessageSenderComponent.Instance = null;
  17. TimerComponent.Instance.Remove(self.TimeoutCheckTimer);
  18. self.TimeoutCheckTimer = 0;
  19. self.TimeoutActorMessageSenders.Clear();
  20. }
  21. }
  22. public static class ActorMessageSenderComponentSystem
  23. {
  24. public static void Check(this ActorMessageSenderComponent self, bool isTimeOut)
  25. {
  26. long timeNow = TimeHelper.Now();
  27. foreach ((int key, ActorMessageSender value) in self.requestCallback)
  28. {
  29. if (timeNow < value.CreateTime + ActorMessageSenderComponent.TIMEOUT_TIME)
  30. {
  31. continue;
  32. }
  33. self.TimeoutActorMessageSenders.Add(key);
  34. }
  35. foreach (int rpcId in self.TimeoutActorMessageSenders)
  36. {
  37. ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
  38. self.requestCallback.Remove(rpcId);
  39. Log.Error($"actor request timeout: {rpcId}");
  40. actorMessageSender.Callback.Invoke(new ActorResponse() {Error = ErrorCode.ERR_ActorTimeout});
  41. }
  42. self.TimeoutActorMessageSenders.Clear();
  43. }
  44. public static void Send(this ActorMessageSenderComponent self, long actorId, IActorMessage message)
  45. {
  46. if (actorId == 0)
  47. {
  48. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  49. }
  50. int process = IdGenerater.GetProcess(actorId);
  51. string address = StartProcessConfigCategory.Instance.Get(process).InnerAddress;
  52. Session session = NetInnerComponent.Instance.Get(address);
  53. message.ActorId = actorId;
  54. session.Send(message);
  55. }
  56. public static ETTask<IActorResponse> Call(this ActorMessageSenderComponent self, long actorId, IActorRequest message, bool exception = true)
  57. {
  58. if (actorId == 0)
  59. {
  60. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  61. }
  62. var tcs = new ETTaskCompletionSource<IActorResponse>();
  63. int process = IdGenerater.GetProcess(actorId);
  64. string address = StartProcessConfigCategory.Instance.Get(process).InnerAddress;
  65. Session session = NetInnerComponent.Instance.Get(address);
  66. InstanceIdStruct instanceIdStruct = new InstanceIdStruct(actorId);
  67. instanceIdStruct.Process = IdGenerater.Process;
  68. message.ActorId = instanceIdStruct.ToLong();
  69. message.RpcId = ++self.RpcId;
  70. self.requestCallback.Add(message.RpcId, new ActorMessageSender((response) =>
  71. {
  72. if (exception && ErrorCode.IsRpcNeedThrowException(response.Error))
  73. {
  74. tcs.SetException(new Exception($"Rpc error: {MongoHelper.ToJson(response)}"));
  75. return;
  76. }
  77. tcs.SetResult(response);
  78. }));
  79. session.Send(message);
  80. return tcs.Task;
  81. }
  82. public static async ETTask<IActorResponse> CallWithoutException(this ActorMessageSenderComponent self, long actorId, IActorRequest message)
  83. {
  84. return await self.Call(actorId, message, false);
  85. }
  86. public static void RunMessage(this ActorMessageSenderComponent self, IActorResponse response)
  87. {
  88. ActorMessageSender actorMessageSender;
  89. if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
  90. {
  91. Log.Error($"not found rpc, maybe request timeout, response message: {StringHelper.MessageToStr(response)}");
  92. return;
  93. }
  94. self.requestCallback.Remove(response.RpcId);
  95. actorMessageSender.Callback(response);
  96. }
  97. }
  98. }