|
@@ -30,35 +30,38 @@ namespace ET
|
|
|
Fiber fiber = self.Fiber();
|
|
Fiber fiber = self.Fiber();
|
|
|
ActorMessageQueue.Instance.Fetch(fiber.Id, 1000, self.list);
|
|
ActorMessageQueue.Instance.Fetch(fiber.Id, 1000, self.list);
|
|
|
|
|
|
|
|
- ActorInnerComponent actorInnerComponent = fiber.Root.GetComponent<ActorInnerComponent>();
|
|
|
|
|
foreach (ActorMessageInfo actorMessageInfo in self.list)
|
|
foreach (ActorMessageInfo actorMessageInfo in self.list)
|
|
|
{
|
|
{
|
|
|
- if (actorMessageInfo.MessageObject is IActorResponse response)
|
|
|
|
|
- {
|
|
|
|
|
- actorInnerComponent.HandleIActorResponse(response);
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ self.HandleMessage(fiber, actorMessageInfo);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- ActorId actorId = actorMessageInfo.ActorId;
|
|
|
|
|
- MessageObject message = actorMessageInfo.MessageObject;
|
|
|
|
|
|
|
+ private static void HandleMessage(this ActorInnerComponent self, Fiber fiber, in ActorMessageInfo actorMessageInfo)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (actorMessageInfo.MessageObject is IActorResponse response)
|
|
|
|
|
+ {
|
|
|
|
|
+ self.HandleIActorResponse(response);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ActorId actorId = actorMessageInfo.ActorId;
|
|
|
|
|
+ IActorMessage message = actorMessageInfo.MessageObject;
|
|
|
|
|
|
|
|
- MailBoxComponent mailBoxComponent = self.Fiber().Mailboxes.Get(actorId.InstanceId);
|
|
|
|
|
- if (mailBoxComponent == null)
|
|
|
|
|
|
|
+ MailBoxComponent mailBoxComponent = self.Fiber().Mailboxes.Get(actorId.InstanceId);
|
|
|
|
|
+ if (mailBoxComponent == null)
|
|
|
|
|
+ {
|
|
|
|
|
+ Log.Warning($"actor not found mailbox, from: {actorId} current: {fiber.Address} {message}");
|
|
|
|
|
+ if (message is IActorRequest request)
|
|
|
{
|
|
{
|
|
|
- Log.Warning($"actor not found mailbox, from: {actorId} current: {fiber.Address} {message}");
|
|
|
|
|
- if (message is IActorRequest request)
|
|
|
|
|
- {
|
|
|
|
|
- IActorResponse resp = ActorHelper.CreateResponse(request, ErrorCore.ERR_NotFoundActor);
|
|
|
|
|
- actorInnerComponent.Reply(actorId.Address, resp);
|
|
|
|
|
- }
|
|
|
|
|
- return;
|
|
|
|
|
|
|
+ IActorResponse resp = ActorHelper.CreateResponse(request, ErrorCore.ERR_NotFoundActor);
|
|
|
|
|
+ self.Reply(actorId.Address, resp);
|
|
|
}
|
|
}
|
|
|
- mailBoxComponent.Add(actorId.Address, message);
|
|
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
|
|
+ mailBoxComponent.Add(actorId.Address, message);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- public static void HandleIActorResponse(this ActorInnerComponent self, IActorResponse response)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ private static void HandleIActorResponse(this ActorInnerComponent self, IActorResponse response)
|
|
|
{
|
|
{
|
|
|
if (!self.requestCallback.Remove(response.RpcId, out ActorMessageSender actorMessageSender))
|
|
if (!self.requestCallback.Remove(response.RpcId, out ActorMessageSender actorMessageSender))
|
|
|
{
|
|
{
|
|
@@ -86,23 +89,30 @@ namespace ET
|
|
|
|
|
|
|
|
public static void Reply(this ActorInnerComponent self, Address fromAddress, IActorResponse message)
|
|
public static void Reply(this ActorInnerComponent self, Address fromAddress, IActorResponse message)
|
|
|
{
|
|
{
|
|
|
- self.Send(new ActorId(fromAddress, 0), message);
|
|
|
|
|
|
|
+ self.SendInner(new ActorId(fromAddress, 0), message);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public static void Send(this ActorInnerComponent self, ActorId actorId, IActorMessage message)
|
|
public static void Send(this ActorInnerComponent self, ActorId actorId, IActorMessage message)
|
|
|
|
|
+ {
|
|
|
|
|
+ self.SendInner(actorId, message);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static void SendInner(this ActorInnerComponent self, ActorId actorId, IActorMessage message)
|
|
|
{
|
|
{
|
|
|
Fiber fiber = self.Fiber();
|
|
Fiber fiber = self.Fiber();
|
|
|
|
|
+
|
|
|
// 如果发向同一个进程,则扔到消息队列中
|
|
// 如果发向同一个进程,则扔到消息队列中
|
|
|
if (actorId.Process != fiber.Process)
|
|
if (actorId.Process != fiber.Process)
|
|
|
{
|
|
{
|
|
|
throw new Exception($"actor inner process diff: {actorId.Process} {fiber.Process}");
|
|
throw new Exception($"actor inner process diff: {actorId.Process} {fiber.Process}");
|
|
|
}
|
|
}
|
|
|
- ActorMessageQueue.Instance.Send(fiber.Address, actorId, (MessageObject)message);
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- private static void SendInner(this ActorInnerComponent self, ActorId actorId, MessageObject message)
|
|
|
|
|
- {
|
|
|
|
|
- Fiber fiber = self.Fiber();
|
|
|
|
|
|
|
+ if (actorId.Fiber == fiber.Id)
|
|
|
|
|
+ {
|
|
|
|
|
+ self.HandleMessage(fiber, new ActorMessageInfo() {ActorId = actorId, MessageObject = message});
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
ActorMessageQueue.Instance.Send(fiber.Address, actorId, message);
|
|
ActorMessageQueue.Instance.Send(fiber.Address, actorId, message);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -150,7 +160,7 @@ namespace ET
|
|
|
|
|
|
|
|
self.requestCallback.Add(rpcId, new ActorMessageSender(actorId, iActorRequest, tcs, needException));
|
|
self.requestCallback.Add(rpcId, new ActorMessageSender(actorId, iActorRequest, tcs, needException));
|
|
|
|
|
|
|
|
- self.SendInner(actorId, iActorRequest as MessageObject);
|
|
|
|
|
|
|
+ self.SendInner(actorId, iActorRequest);
|
|
|
|
|
|
|
|
|
|
|
|
|
async ETTask Timeout()
|
|
async ETTask Timeout()
|