MessageSender.cs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. using System;
  2. using System.Collections.Generic;
  3. namespace ET.Server
  4. {
  5. [FriendOf(typeof(MessageSender))]
  6. public static partial class MessageSenderSystem
  7. {
  8. public static void Send(this MessageSender self, ActorId actorId, IMessage message)
  9. {
  10. Fiber fiber = self.Fiber();
  11. // 如果发向同一个进程,则扔到消息队列中
  12. if (actorId.Process == fiber.Process)
  13. {
  14. fiber.Root.GetComponent<ProcessInnerSender>().Send(actorId, message);
  15. return;
  16. }
  17. // 发给NetInner纤程
  18. A2NetInner_Message a2NetInnerMessage = A2NetInner_Message.Create();
  19. a2NetInnerMessage.FromAddress = fiber.Address;
  20. a2NetInnerMessage.ActorId = actorId;
  21. a2NetInnerMessage.MessageObject = message;
  22. MessageQueue.Instance.Send(new ActorId(fiber.Process, SceneType.NetInner), a2NetInnerMessage);
  23. }
  24. private static int GetRpcId(this MessageSender self)
  25. {
  26. return ++self.RpcId;
  27. }
  28. public static async ETTask<IResponse> Call(
  29. this MessageSender self,
  30. ActorId actorId,
  31. IRequest request,
  32. bool needException = true
  33. )
  34. {
  35. if (actorId == default)
  36. {
  37. throw new Exception($"actor id is 0: {request}");
  38. }
  39. Fiber fiber = self.Fiber();
  40. IResponse response;
  41. if (fiber.Process == actorId.Process)
  42. {
  43. response = await fiber.Root.GetComponent<ProcessInnerSender>().Call(actorId, request, needException: needException);
  44. }
  45. else
  46. {
  47. // 发给NetInner纤程
  48. A2NetInner_Request a2NetInner_Request = A2NetInner_Request.Create();
  49. a2NetInner_Request.ActorId = actorId;
  50. a2NetInner_Request.MessageObject = request;
  51. using A2NetInner_Response a2NetInnerResponse = await fiber.Root.GetComponent<ProcessInnerSender>().Call(
  52. new ActorId(fiber.Process, SceneType.NetInner), a2NetInner_Request) as A2NetInner_Response;
  53. response = a2NetInnerResponse.MessageObject;
  54. }
  55. if (response.Error == ErrorCore.ERR_MessageTimeout)
  56. {
  57. throw new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {actorId} {request}, response: {response}");
  58. }
  59. if (needException && ErrorCore.IsRpcNeedThrowException(response.Error))
  60. {
  61. throw new RpcException(response.Error, $"Rpc error: actorId: {actorId} {request}, response: {response}");
  62. }
  63. return response;
  64. }
  65. }
  66. [ComponentOf(typeof(Scene))]
  67. public class MessageSender: Entity, IAwake, IDestroy
  68. {
  69. public const long TIMEOUT_TIME = 40 * 1000;
  70. public int RpcId;
  71. public readonly Dictionary<int, MessageSenderStruct> requestCallback = new();
  72. }
  73. }