ProcessOuterSender.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net;
  4. namespace ET.Server
  5. {
  6. [EntitySystemOf(typeof(ProcessOuterSender))]
  7. public static partial class ProcessOuterSenderSystem
  8. {
  9. [EntitySystem]
  10. private static void Awake(this ProcessOuterSender self, IPEndPoint address)
  11. {
  12. switch (self.InnerProtocol)
  13. {
  14. case NetworkProtocol.TCP:
  15. {
  16. self.AService = new TService(address, ServiceType.Inner);
  17. break;
  18. }
  19. case NetworkProtocol.KCP:
  20. {
  21. self.AService = new KService(address, NetworkProtocol.UDP, ServiceType.Inner);
  22. break;
  23. }
  24. }
  25. self.AService.AcceptCallback = self.OnAccept;
  26. self.AService.ReadCallback = self.OnRead;
  27. self.AService.ErrorCallback = self.OnError;
  28. }
  29. [EntitySystem]
  30. private static void Update(this ProcessOuterSender self)
  31. {
  32. self.AService.Update();
  33. }
  34. [EntitySystem]
  35. private static void Destroy(this ProcessOuterSender self)
  36. {
  37. self.AService.Dispose();
  38. }
  39. private static void OnRead(this ProcessOuterSender self, long channelId, MemoryBuffer memoryBuffer)
  40. {
  41. Session session = self.GetChild<Session>(channelId);
  42. if (session == null)
  43. {
  44. return;
  45. }
  46. session.LastRecvTime = TimeInfo.Instance.ClientFrameTime();
  47. (ActorId actorId, object message) = MessageSerializeHelper.ToMessage(self.AService, memoryBuffer);
  48. if (message is IResponse response)
  49. {
  50. self.HandleIActorResponse(response);
  51. return;
  52. }
  53. Fiber fiber = self.Fiber();
  54. int fromProcess = actorId.Process;
  55. actorId.Process = fiber.Process;
  56. switch (message)
  57. {
  58. case ILocationRequest:
  59. case IRequest:
  60. {
  61. CallInner().Coroutine();
  62. break;
  63. async ETTask CallInner()
  64. {
  65. IRequest req = (IRequest)message;
  66. int rpcId = req.RpcId;
  67. // 注意这里都不能抛异常,因为这里只是中转消息
  68. IResponse res = await fiber.Root.GetComponent<ProcessInnerSender>().Call(actorId, req, false);
  69. // 注意这里的response会在该协程执行完之后由ProcessInnerSender dispose。
  70. actorId.Process = fromProcess;
  71. res.RpcId = rpcId;
  72. self.Send(actorId, res);
  73. ((MessageObject)res).Dispose();
  74. }
  75. }
  76. default:
  77. {
  78. fiber.Root.GetComponent<ProcessInnerSender>().Send(actorId, (IMessage)message);
  79. break;
  80. }
  81. }
  82. }
  83. private static void OnError(this ProcessOuterSender self, long channelId, int error)
  84. {
  85. Session session = self.GetChild<Session>(channelId);
  86. if (session == null)
  87. {
  88. return;
  89. }
  90. session.Error = error;
  91. session.Dispose();
  92. }
  93. // 这个channelId是由CreateAcceptChannelId生成的
  94. private static void OnAccept(this ProcessOuterSender self, long channelId, IPEndPoint ipEndPoint)
  95. {
  96. Session session = self.AddChildWithId<Session, AService>(channelId, self.AService);
  97. session.RemoteAddress = ipEndPoint;
  98. //session.AddComponent<SessionIdleCheckerComponent, int, int, int>(NetThreadComponent.checkInteral, NetThreadComponent.recvMaxIdleTime, NetThreadComponent.sendMaxIdleTime);
  99. }
  100. private static Session CreateInner(this ProcessOuterSender self, long channelId, IPEndPoint ipEndPoint)
  101. {
  102. Session session = self.AddChildWithId<Session, AService>(channelId, self.AService);
  103. session.RemoteAddress = ipEndPoint;
  104. self.AService.Create(channelId, session.RemoteAddress);
  105. //session.AddComponent<InnerPingComponent>();
  106. //session.AddComponent<SessionIdleCheckerComponent, int, int, int>(NetThreadComponent.checkInteral, NetThreadComponent.recvMaxIdleTime, NetThreadComponent.sendMaxIdleTime);
  107. return session;
  108. }
  109. // 内网actor session,channelId是进程号
  110. private static Session Get(this ProcessOuterSender self, long channelId)
  111. {
  112. Session session = self.GetChild<Session>(channelId);
  113. if (session != null)
  114. {
  115. return session;
  116. }
  117. IPEndPoint ipEndPoint = StartProcessConfigCategory.Instance.Get((int) channelId).IPEndPoint;
  118. session = self.CreateInner(channelId, ipEndPoint);
  119. return session;
  120. }
  121. private static void HandleIActorResponse(this ProcessOuterSender self, IResponse response)
  122. {
  123. if (!self.requestCallback.Remove(response.RpcId, out MessageSenderStruct actorMessageSender))
  124. {
  125. return;
  126. }
  127. Run(actorMessageSender, response);
  128. }
  129. private static void Run(MessageSenderStruct self, IResponse response)
  130. {
  131. if (response.Error == ErrorCore.ERR_MessageTimeout)
  132. {
  133. self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.RequestType.FullName}, response: {response}"));
  134. return;
  135. }
  136. if (self.NeedException && ErrorCore.IsRpcNeedThrowException(response.Error))
  137. {
  138. self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.RequestType.FullName}, response: {response}"));
  139. return;
  140. }
  141. self.SetResult(response);
  142. }
  143. public static void Send(this ProcessOuterSender self, ActorId actorId, IMessage message)
  144. {
  145. self.SendInner(actorId, message as MessageObject);
  146. }
  147. private static void SendInner(this ProcessOuterSender self, ActorId actorId, MessageObject message)
  148. {
  149. if (actorId == default)
  150. {
  151. throw new Exception($"actor id is 0: {message}");
  152. }
  153. Fiber fiber = self.Fiber();
  154. // 如果发向同一个进程,则报错
  155. if (actorId.Process == fiber.Process)
  156. {
  157. throw new Exception($"actor is the same process: {fiber.Process} {actorId.Process}");
  158. }
  159. StartProcessConfig startProcessConfig = StartProcessConfigCategory.Instance.Get(actorId.Process);
  160. Session session = self.Get(startProcessConfig.Id);
  161. actorId.Process = fiber.Process;
  162. session.Send(actorId, message);
  163. }
  164. private static int GetRpcId(this ProcessOuterSender self)
  165. {
  166. return ++self.RpcId;
  167. }
  168. public static async ETTask<IResponse> Call(this ProcessOuterSender self, ActorId actorId, IRequest iRequest, bool needException = true)
  169. {
  170. if (actorId == default)
  171. {
  172. throw new Exception($"actor id is 0: {iRequest}");
  173. }
  174. Fiber fiber = self.Fiber();
  175. int rpcId = self.GetRpcId();
  176. iRequest.RpcId = rpcId;
  177. Type requestType = iRequest.GetType();
  178. MessageSenderStruct messageSenderStruct = new(actorId, requestType, needException);
  179. self.requestCallback.Add(rpcId, messageSenderStruct);
  180. self.SendInner(actorId, iRequest as MessageObject);
  181. async ETTask Timeout()
  182. {
  183. await fiber.Root.GetComponent<TimerComponent>().WaitAsync(ProcessOuterSender.TIMEOUT_TIME);
  184. if (!self.requestCallback.Remove(rpcId, out MessageSenderStruct action))
  185. {
  186. return;
  187. }
  188. if (needException)
  189. {
  190. action.SetException(new Exception($"actor sender timeout: {requestType.FullName}"));
  191. }
  192. else
  193. {
  194. IResponse response = MessageHelper.CreateResponse(requestType, rpcId, ErrorCore.ERR_Timeout);
  195. action.SetResult(response);
  196. }
  197. }
  198. Timeout().Coroutine();
  199. long beginTime = TimeInfo.Instance.ServerFrameTime();
  200. IResponse response = await messageSenderStruct.Wait();
  201. long endTime = TimeInfo.Instance.ServerFrameTime();
  202. long costTime = endTime - beginTime;
  203. if (costTime > 200)
  204. {
  205. Log.Warning($"actor rpc time > 200: {costTime} {requestType.FullName}");
  206. }
  207. return response;
  208. }
  209. }
  210. [ComponentOf(typeof(Scene))]
  211. public class ProcessOuterSender: Entity, IAwake<IPEndPoint>, IUpdate, IDestroy
  212. {
  213. public const long TIMEOUT_TIME = 40 * 1000;
  214. public int RpcId;
  215. public readonly Dictionary<int, MessageSenderStruct> requestCallback = new();
  216. public AService AService;
  217. public NetworkProtocol InnerProtocol = NetworkProtocol.KCP;
  218. }
  219. }