|
|
@@ -4,40 +4,22 @@ using System.IO;
|
|
|
|
|
|
namespace ET
|
|
|
{
|
|
|
- [FriendOf(typeof(ActorSenderComponent))]
|
|
|
- public static partial class ActorSenderComponentSystem
|
|
|
+ [FriendOf(typeof(ActorInnerComponent))]
|
|
|
+ public static partial class ActorInnerComponentSystem
|
|
|
{
|
|
|
- [Invoke(TimerInvokeType.ActorMessageSenderChecker)]
|
|
|
- public class ActorMessageSenderChecker: ATimer<ActorSenderComponent>
|
|
|
+ public static void HandleIActorResponse(this ActorInnerComponent self, IActorResponse response)
|
|
|
{
|
|
|
- protected override void Run(ActorSenderComponent self)
|
|
|
+ ActorMessageSender actorMessageSender;
|
|
|
+ if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
|
|
|
{
|
|
|
- try
|
|
|
- {
|
|
|
- self.Check();
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- Log.Error($"move timer error: {self.Id}\n{e}");
|
|
|
- }
|
|
|
+ return;
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- [EntitySystem]
|
|
|
- private static void Awake(this ActorSenderComponent self, SceneType sceneType)
|
|
|
- {
|
|
|
- self.SceneType = sceneType;
|
|
|
- self.TimeoutCheckTimer = self.Fiber().TimerComponent.NewRepeatedTimer(1000, TimerInvokeType.ActorMessageSenderChecker, self);
|
|
|
+
|
|
|
+ self.requestCallback.Remove(response.RpcId);
|
|
|
+
|
|
|
+ Run(actorMessageSender, response);
|
|
|
}
|
|
|
|
|
|
- [EntitySystem]
|
|
|
- private static void Destroy(this ActorSenderComponent self)
|
|
|
- {
|
|
|
- self.Fiber().TimerComponent.Remove(ref self.TimeoutCheckTimer);
|
|
|
- self.TimeoutCheckTimer = 0;
|
|
|
- self.TimeoutActorMessageSenders.Clear();
|
|
|
- }
|
|
|
-
|
|
|
private static void Run(ActorMessageSender self, IActorResponse response)
|
|
|
{
|
|
|
if (response.Error == ErrorCore.ERR_ActorTimeout)
|
|
|
@@ -54,50 +36,18 @@ namespace ET
|
|
|
|
|
|
self.Tcs.SetResult(response);
|
|
|
}
|
|
|
-
|
|
|
- private static void Check(this ActorSenderComponent self)
|
|
|
- {
|
|
|
- long timeNow = self.Fiber().TimeInfo.ServerNow();
|
|
|
- foreach ((int key, ActorMessageSender value) in self.requestCallback)
|
|
|
- {
|
|
|
- // 因为是顺序发送的,所以,检测到第一个不超时的就退出
|
|
|
- if (timeNow < value.CreateTime + ActorSenderComponent.TIMEOUT_TIME)
|
|
|
- {
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- self.TimeoutActorMessageSenders.Add(key);
|
|
|
- }
|
|
|
-
|
|
|
- foreach (int rpcId in self.TimeoutActorMessageSenders)
|
|
|
- {
|
|
|
- ActorMessageSender actorMessageSender = self.requestCallback[rpcId];
|
|
|
- self.requestCallback.Remove(rpcId);
|
|
|
- try
|
|
|
- {
|
|
|
- IActorResponse response = ActorHelper.CreateResponse(actorMessageSender.Request, ErrorCore.ERR_ActorTimeout);
|
|
|
- Run(actorMessageSender, response);
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- Log.Error(e.ToString());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- self.TimeoutActorMessageSenders.Clear();
|
|
|
- }
|
|
|
|
|
|
- public static void Reply(this ActorSenderComponent self, Address fromAddress, IActorResponse message)
|
|
|
+ public static void Reply(this ActorInnerComponent self, Address fromAddress, IActorResponse message)
|
|
|
{
|
|
|
self.SendInner(new ActorId(fromAddress, 0), message as MessageObject);
|
|
|
}
|
|
|
|
|
|
- public static void Send(this ActorSenderComponent self, ActorId actorId, IActorMessage message)
|
|
|
+ public static void Send(this ActorInnerComponent self, ActorId actorId, IActorMessage message)
|
|
|
{
|
|
|
self.SendInner(actorId, message as MessageObject);
|
|
|
}
|
|
|
|
|
|
- private static void SendInner(this ActorSenderComponent self, ActorId actorId, MessageObject message)
|
|
|
+ private static void SendInner(this ActorInnerComponent self, ActorId actorId, MessageObject message)
|
|
|
{
|
|
|
if (actorId == default)
|
|
|
{
|
|
|
@@ -106,22 +56,21 @@ namespace ET
|
|
|
|
|
|
Fiber fiber = self.Fiber();
|
|
|
// 如果发向同一个进程,则扔到消息队列中
|
|
|
- if (actorId.Process == fiber.Process)
|
|
|
+ if (actorId.Process != fiber.Process)
|
|
|
{
|
|
|
- ActorMessageQueue.Instance.Send(fiber.Address, actorId, message);
|
|
|
- return;
|
|
|
+ throw new Exception($"actor is not the same process: {fiber.Process} {actorId.Process}");
|
|
|
}
|
|
|
|
|
|
- EventSystem.Instance.Invoke((long)self.SceneType, new ActorSenderInvoker() {Fiber = fiber, ActorId = actorId, MessageObject = message});
|
|
|
+ ActorMessageQueue.Instance.Send(fiber.Address, actorId, message);
|
|
|
}
|
|
|
|
|
|
- public static int GetRpcId(this ActorSenderComponent self)
|
|
|
+ public static int GetRpcId(this ActorInnerComponent self)
|
|
|
{
|
|
|
return ++self.RpcId;
|
|
|
}
|
|
|
|
|
|
public static async ETTask<IActorResponse> Call(
|
|
|
- this ActorSenderComponent self,
|
|
|
+ this ActorInnerComponent self,
|
|
|
ActorId actorId,
|
|
|
IActorRequest request,
|
|
|
bool needException = true
|
|
|
@@ -138,7 +87,7 @@ namespace ET
|
|
|
}
|
|
|
|
|
|
public static async ETTask<IActorResponse> Call(
|
|
|
- this ActorSenderComponent self,
|
|
|
+ this ActorInnerComponent self,
|
|
|
ActorId actorId,
|
|
|
int rpcId,
|
|
|
IActorRequest iActorRequest,
|
|
|
@@ -153,12 +102,29 @@ namespace ET
|
|
|
var tcs = ETTask<IActorResponse>.Create(true);
|
|
|
|
|
|
Fiber fiber = self.Fiber();
|
|
|
- self.requestCallback.Add(rpcId, new ActorMessageSender(actorId, iActorRequest, tcs, needException, fiber.TimeInfo.ServerFrameTime()));
|
|
|
+ self.requestCallback.Add(rpcId, new ActorMessageSender(actorId, iActorRequest, tcs, needException));
|
|
|
|
|
|
self.SendInner(actorId, iActorRequest as MessageObject);
|
|
|
|
|
|
+
|
|
|
+ async ETTask Timeout()
|
|
|
+ {
|
|
|
+ await fiber.TimerComponent.WaitAsync(ActorInnerComponent.TIMEOUT_TIME);
|
|
|
+ if (!self.requestCallback.TryGetValue(rpcId, out ActorMessageSender action))
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ self.requestCallback.Remove(rpcId);
|
|
|
+ action.Tcs.SetException(new Exception($"actor sender timeout: {iActorRequest}"));
|
|
|
+ }
|
|
|
+
|
|
|
+ Timeout().Coroutine();
|
|
|
+
|
|
|
long beginTime = fiber.TimeInfo.ServerFrameTime();
|
|
|
+
|
|
|
IActorResponse response = await tcs;
|
|
|
+
|
|
|
long endTime = fiber.TimeInfo.ServerFrameTime();
|
|
|
|
|
|
long costTime = endTime - beginTime;
|
|
|
@@ -169,18 +135,5 @@ namespace ET
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
-
|
|
|
- public static void HandleIActorResponse(this ActorSenderComponent self, IActorResponse response)
|
|
|
- {
|
|
|
- ActorMessageSender actorMessageSender;
|
|
|
- if (!self.requestCallback.TryGetValue(response.RpcId, out actorMessageSender))
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- self.requestCallback.Remove(response.RpcId);
|
|
|
-
|
|
|
- Run(actorMessageSender, response);
|
|
|
- }
|
|
|
}
|
|
|
}
|