ActorMessageSenderComponentSystem.cs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. using System;
  2. using ETModel;
  3. namespace ETHotfix
  4. {
  5. [ObjectSystem]
  6. public class ActorMessageSenderComponentAwakeSystem : AwakeSystem<ActorMessageSenderComponent>
  7. {
  8. public override void Awake(ActorMessageSenderComponent self)
  9. {
  10. ActorMessageSenderComponent.Instance = self;
  11. self.TimeoutCheckTimer = TimerComponent.Instance.NewRepeatedTimer(10 * 1000, self.Check);
  12. }
  13. }
  14. [ObjectSystem]
  15. public class ActorMessageSenderComponentDestroySystem: DestroySystem<ActorMessageSenderComponent>
  16. {
  17. public override void Destroy(ActorMessageSenderComponent self)
  18. {
  19. ActorMessageSenderComponent.Instance = null;
  20. TimerComponent.Instance.Remove(self.TimeoutCheckTimer);
  21. self.TimeoutCheckTimer = 0;
  22. self.TimeoutActorMessageSenders.Clear();
  23. }
  24. }
  25. public static class ActorMessageSenderComponentSystem
  26. {
  27. public static void Check(this ActorMessageSenderComponent self)
  28. {
  29. long timeNow = TimeHelper.Now();
  30. foreach ((int key, ActorMessageSender value) in self.requestCallback)
  31. {
  32. if (timeNow < value.CreateTime + ActorMessageSenderComponent.TIMEOUT_TIME)
  33. {
  34. continue;
  35. }
  36. self.TimeoutActorMessageSenders.Add(key);
  37. }
  38. foreach (int rpcId in self.TimeoutActorMessageSenders)
  39. {
  40. ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
  41. self.requestCallback.Remove(rpcId);
  42. Log.Error($"actor request timeout: {rpcId}");
  43. actorMessageSender.Callback.Invoke(new ActorResponse() {Error = ErrorCode.ERR_ActorTimeout});
  44. }
  45. self.TimeoutActorMessageSenders.Clear();
  46. }
  47. public static void Send(this ActorMessageSenderComponent self, long actorId, IActorMessage message)
  48. {
  49. if (actorId == 0)
  50. {
  51. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  52. }
  53. string address = StartConfigComponent.Instance.GetProcessInnerAddress(IdGenerater.GetProcessId(actorId));
  54. Session session = NetInnerComponent.Instance.Get(address);
  55. message.ActorId = actorId;
  56. session.Send(message);
  57. }
  58. public static ETTask<IActorResponse> Call(this ActorMessageSenderComponent self, long actorId, IActorRequest message)
  59. {
  60. if (actorId == 0)
  61. {
  62. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  63. }
  64. string address = StartConfigComponent.Instance.GetProcessInnerAddress(IdGenerater.GetProcessId(actorId));
  65. Session session = NetInnerComponent.Instance.Get(address);
  66. message.ActorId = actorId & IdGenerater.HeadMask | IdGenerater.Head;
  67. message.RpcId = ++self.RpcId;
  68. var tcs = new ETTaskCompletionSource<IActorResponse>();
  69. self.requestCallback.Add(message.RpcId, new ActorMessageSender((response) =>
  70. {
  71. if (ErrorCode.IsRpcNeedThrowException(response.Error))
  72. {
  73. tcs.SetException(new Exception($"Rpc error: {MongoHelper.ToJson(response)}"));
  74. return;
  75. }
  76. tcs.SetResult(response);
  77. }));
  78. session.Send(message);
  79. return tcs.Task;
  80. }
  81. public static ETTask<IActorResponse> CallWithoutException(this ActorMessageSenderComponent self, long actorId, IActorRequest message)
  82. {
  83. if (actorId == 0)
  84. {
  85. throw new Exception($"actor id is 0: {MongoHelper.ToJson(message)}");
  86. }
  87. string address = StartConfigComponent.Instance.GetProcessInnerAddress(IdGenerater.GetProcessId(actorId));
  88. Session session = NetInnerComponent.Instance.Get(address);
  89. message.ActorId = actorId & IdGenerater.HeadMask | IdGenerater.Head;
  90. message.RpcId = ++self.RpcId;
  91. var tcs = new ETTaskCompletionSource<IActorResponse>();
  92. self.requestCallback.Add(message.RpcId, new ActorMessageSender((response) =>
  93. {
  94. tcs.SetResult(response);
  95. }));
  96. session.Send(message);
  97. return tcs.Task;
  98. }
  99. public static void RunMessage(this ActorMessageSenderComponent self, IActorResponse response)
  100. {
  101. ActorMessageSender actorMessageSender;
  102. if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
  103. {
  104. Log.Error($"not found rpc, maybe request timeout, response message: {StringHelper.MessageToStr(response)}");
  105. return;
  106. }
  107. self.requestCallback.Remove(response.RpcId);
  108. actorMessageSender.Callback(response);
  109. }
  110. }
  111. }