ActorMessageSenderComponentSystem.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. using System;
  2. using ETModel;
  3. namespace ETHotfix
  4. {
  5. [ObjectSystem]
  6. public class ActorMessageSenderComponentAwakeSystem : AwakeSystem<ActorMessageSenderComponent>
  7. {
  8. public override void Awake(ActorMessageSenderComponent self)
  9. {
  10. ActorMessageSenderComponent.Instance = self;
  11. self.TimeoutCheckTimer = TimerComponent.Instance.NewRepeatedTimer(10 * 1000, self.Check);
  12. }
  13. }
  14. [ObjectSystem]
  15. public class ActorMessageSenderComponentDestroySystem: DestroySystem<ActorMessageSenderComponent>
  16. {
  17. public override void Destroy(ActorMessageSenderComponent self)
  18. {
  19. ActorMessageSenderComponent.Instance = null;
  20. TimerComponent.Instance.Remove(self.TimeoutCheckTimer);
  21. self.TimeoutCheckTimer = 0;
  22. self.TimeoutActorMessageSenders.Clear();
  23. }
  24. }
  25. public static class ActorMessageSenderComponentSystem
  26. {
  27. public static void Check(this ActorMessageSenderComponent self)
  28. {
  29. long timeNow = TimeHelper.Now();
  30. foreach ((int key, ActorMessageSender value) in self.requestCallback)
  31. {
  32. if (timeNow < value.CreateTime + ActorMessageSenderComponent.TIMEOUT_TIME)
  33. {
  34. continue;
  35. }
  36. self.TimeoutActorMessageSenders.Add(key);
  37. }
  38. foreach (int rpcId in self.TimeoutActorMessageSenders)
  39. {
  40. ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
  41. self.requestCallback.Remove(rpcId);
  42. Log.Error($"actor request timeout: {rpcId}");
  43. actorMessageSender.Callback.Invoke(new ActorResponse() {Error = ErrorCode.ERR_ActorTimeout});
  44. }
  45. self.TimeoutActorMessageSenders.Clear();
  46. }
  47. public static void Send(this ActorMessageSenderComponent self, long actorId, IActorMessage message)
  48. {
  49. if (actorId == 0)
  50. {
  51. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  52. }
  53. int process = IdGenerater.GetProcess(actorId);
  54. string address = StartProcessConfigCategory.Instance.Get(process).InnerAddress;
  55. Session session = NetInnerComponent.Instance.Get(address);
  56. message.ActorId = actorId;
  57. session.Send(message);
  58. }
  59. public static ETTask<IActorResponse> Call(this ActorMessageSenderComponent self, long actorId, IActorRequest message, bool exception = true)
  60. {
  61. if (actorId == 0)
  62. {
  63. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  64. }
  65. var tcs = new ETTaskCompletionSource<IActorResponse>();
  66. int process = IdGenerater.GetProcess(actorId);
  67. string address = StartProcessConfigCategory.Instance.Get(process).InnerAddress;
  68. Session session = NetInnerComponent.Instance.Get(address);
  69. InstanceIdStruct instanceIdStruct = new InstanceIdStruct(actorId);
  70. instanceIdStruct.Process = IdGenerater.Process;
  71. message.ActorId = instanceIdStruct.ToLong();
  72. message.RpcId = ++self.RpcId;
  73. self.requestCallback.Add(message.RpcId, new ActorMessageSender((response) =>
  74. {
  75. if (exception && ErrorCode.IsRpcNeedThrowException(response.Error))
  76. {
  77. tcs.SetException(new Exception($"Rpc error: {MongoHelper.ToJson(response)}"));
  78. return;
  79. }
  80. tcs.SetResult(response);
  81. }));
  82. session.Send(message);
  83. return tcs.Task;
  84. }
  85. public static async ETTask<IActorResponse> CallWithoutException(this ActorMessageSenderComponent self, long actorId, IActorRequest message)
  86. {
  87. return await self.Call(actorId, message, false);
  88. }
  89. public static void RunMessage(this ActorMessageSenderComponent self, IActorResponse response)
  90. {
  91. ActorMessageSender actorMessageSender;
  92. if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
  93. {
  94. Log.Error($"not found rpc, maybe request timeout, response message: {StringHelper.MessageToStr(response)}");
  95. return;
  96. }
  97. self.requestCallback.Remove(response.RpcId);
  98. actorMessageSender.Callback(response);
  99. }
  100. }
  101. }