ActorMessageSenderComponentSystem.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. using System;
  2. namespace ET
  3. {
  4. [ObjectSystem]
  5. public class ActorMessageSenderComponentAwakeSystem : AwakeSystem<ActorMessageSenderComponent>
  6. {
  7. public override void Awake(ActorMessageSenderComponent self)
  8. {
  9. ActorMessageSenderComponent.Instance = self;
  10. self.TimeoutCheckTimer = TimerComponent.Instance.NewRepeatedTimer(10 * 1000, self.Check);
  11. }
  12. }
  13. [ObjectSystem]
  14. public class ActorMessageSenderComponentDestroySystem: DestroySystem<ActorMessageSenderComponent>
  15. {
  16. public override void Destroy(ActorMessageSenderComponent self)
  17. {
  18. ActorMessageSenderComponent.Instance = null;
  19. TimerComponent.Instance.Remove(self.TimeoutCheckTimer);
  20. self.TimeoutCheckTimer = 0;
  21. self.TimeoutActorMessageSenders.Clear();
  22. }
  23. }
  24. public static class ActorMessageSenderComponentSystem
  25. {
  26. public static void Check(this ActorMessageSenderComponent self, bool isTimeOut)
  27. {
  28. long timeNow = TimeHelper.Now();
  29. foreach ((int key, ActorMessageSender value) in self.requestCallback)
  30. {
  31. if (timeNow < value.CreateTime + ActorMessageSenderComponent.TIMEOUT_TIME)
  32. {
  33. continue;
  34. }
  35. self.TimeoutActorMessageSenders.Add(key);
  36. }
  37. foreach (int rpcId in self.TimeoutActorMessageSenders)
  38. {
  39. ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
  40. self.requestCallback.Remove(rpcId);
  41. Log.Error($"actor request timeout: {rpcId}");
  42. actorMessageSender.Callback.Invoke(new ActorResponse() {Error = ErrorCode.ERR_ActorTimeout});
  43. }
  44. self.TimeoutActorMessageSenders.Clear();
  45. }
  46. public static void Send(this ActorMessageSenderComponent self, long actorId, IActorMessage message)
  47. {
  48. if (actorId == 0)
  49. {
  50. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  51. }
  52. int process = IdGenerater.GetProcess(actorId);
  53. string address = StartProcessConfigCategory.Instance.Get(process).InnerAddress;
  54. Session session = NetInnerComponent.Instance.Get(address);
  55. message.ActorId = actorId;
  56. session.Send(message);
  57. }
  58. public static ETTask<IActorResponse> Call(this ActorMessageSenderComponent self, long actorId, IActorRequest message, bool exception = true)
  59. {
  60. if (actorId == 0)
  61. {
  62. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  63. }
  64. var tcs = new ETTaskCompletionSource<IActorResponse>();
  65. int process = IdGenerater.GetProcess(actorId);
  66. string address = StartProcessConfigCategory.Instance.Get(process).InnerAddress;
  67. Session session = NetInnerComponent.Instance.Get(address);
  68. InstanceIdStruct instanceIdStruct = new InstanceIdStruct(actorId);
  69. instanceIdStruct.Process = IdGenerater.Process;
  70. message.ActorId = instanceIdStruct.ToLong();
  71. message.RpcId = ++self.RpcId;
  72. self.requestCallback.Add(message.RpcId, new ActorMessageSender((response) =>
  73. {
  74. if (exception && ErrorCode.IsRpcNeedThrowException(response.Error))
  75. {
  76. tcs.SetException(new Exception($"Rpc error: {MongoHelper.ToJson(response)}"));
  77. return;
  78. }
  79. tcs.SetResult(response);
  80. }));
  81. session.Send(message);
  82. return tcs.Task;
  83. }
  84. public static async ETTask<IActorResponse> CallWithoutException(this ActorMessageSenderComponent self, long actorId, IActorRequest message)
  85. {
  86. return await self.Call(actorId, message, false);
  87. }
  88. public static void RunMessage(this ActorMessageSenderComponent self, IActorResponse response)
  89. {
  90. ActorMessageSender actorMessageSender;
  91. if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
  92. {
  93. Log.Error($"not found rpc, maybe request timeout, response message: {StringHelper.MessageToStr(response)}");
  94. return;
  95. }
  96. self.requestCallback.Remove(response.RpcId);
  97. actorMessageSender.Callback(response);
  98. }
  99. }
  100. }