Преглед изворни кода

再次简化消息池,除了网络部分序列化后自动回收消息以及反序列化从池中取消息,其它任何地方用不用池,回不回收都让用户决定
还是参考C2G_Ping

tanghai пре 2 година
родитељ
комит
e53fee530d

+ 3 - 3
Unity/Assets/Scripts/Core/Fiber/Module/Actor/MessageHelper.cs

@@ -4,12 +4,12 @@ namespace ET
 {
     public static class MessageHelper
     {
-        public static IResponse CreateResponse(IRequest iRequest, int error)
+        public static IResponse CreateResponse(Type requestType, int rpcId, int error)
         {
-            Type responseType = OpcodeType.Instance.GetResponseType(iRequest.GetType());
+            Type responseType = OpcodeType.Instance.GetResponseType(requestType);
             IResponse response = (IResponse)ObjectPool.Instance.Fetch(responseType);
             response.Error = error;
-            response.RpcId = iRequest.RpcId;
+            response.RpcId = rpcId;
             return response;
         }
     }

+ 3 - 16
Unity/Assets/Scripts/Core/Fiber/Module/Actor/MessageSenderStruct.cs

@@ -8,22 +8,17 @@ namespace ET
     {
         public ActorId ActorId { get; }
         
-        public IRequest Request { get; }
+        public Type RequestType { get; }
         
         private readonly ETTask<IResponse> tcs;
 
-        private readonly bool isFromPool;
-
         public bool NeedException { get; }
         
-        public MessageSenderStruct(ActorId actorId, IRequest iRequest, bool needException)
+        public MessageSenderStruct(ActorId actorId, Type requestType, bool needException)
         {
             this.ActorId = actorId;
             
-            this.Request = iRequest;
-            MessageObject messageObject = (MessageObject)this.Request;
-            this.isFromPool = messageObject.IsFromPool;
-            messageObject.IsFromPool = false;
+            this.RequestType = requestType;
             
             this.tcs = ETTask<IResponse>.Create(true);
             this.NeedException = needException;
@@ -31,19 +26,11 @@ namespace ET
         
         public void SetResult(IResponse response)
         {
-            MessageObject messageObject = (MessageObject)this.Request;
-            messageObject.IsFromPool = this.isFromPool;
-            messageObject.Dispose();
-            
             this.tcs.SetResult(response);
         }
         
         public void SetException(Exception exception)
         {
-            MessageObject messageObject = (MessageObject)this.Request;
-            messageObject.IsFromPool = this.isFromPool;
-            messageObject.Dispose();
-            
             this.tcs.SetException(exception);
         }
 

+ 8 - 8
Unity/Assets/Scripts/Core/Fiber/Module/Actor/ProcessInnerSenderSystem.cs

@@ -52,10 +52,9 @@ namespace ET
                 Log.Warning($"actor not found mailbox, from: {actorId} current: {fiber.Address} {message}");
                 if (message is IRequest request)
                 {
-                    IResponse resp = MessageHelper.CreateResponse(request, ErrorCore.ERR_NotFoundActor);
+                    IResponse resp = MessageHelper.CreateResponse(request.GetType(), request.RpcId, ErrorCore.ERR_NotFoundActor);
                     self.Reply(actorId.Address, resp);
                 }
-                message.Dispose();
                 return;
             }
             mailBoxComponent.Add(actorId.Address, message);
@@ -74,13 +73,13 @@ namespace ET
         {
             if (response.Error == ErrorCore.ERR_MessageTimeout)
             {
-                self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.Request}, response: {response}"));
+                self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.RequestType.FullName}, response: {response}"));
                 return;
             }
 
             if (self.NeedException && ErrorCore.IsRpcNeedThrowException(response.Error))
             {
-                self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.Request}, response: {response}"));
+                self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.RequestType.FullName}, response: {response}"));
                 return;
             }
 
@@ -157,7 +156,8 @@ namespace ET
                 throw new Exception($"actor inner process diff: {actorId.Process} {fiber.Process}");
             }
 
-            MessageSenderStruct messageSenderStruct = new(actorId, iRequest, needException);
+            Type requestType = iRequest.GetType();
+            MessageSenderStruct messageSenderStruct = new(actorId, requestType, needException);
             self.requestCallback.Add(rpcId, messageSenderStruct);
             
             self.SendInner(actorId, (MessageObject)iRequest);
@@ -174,11 +174,11 @@ namespace ET
                 
                 if (needException)
                 {
-                    action.SetException(new Exception($"actor sender timeout: {iRequest}"));
+                    action.SetException(new Exception($"actor sender timeout: {requestType.FullName}"));
                 }
                 else
                 {
-                    IResponse response = MessageHelper.CreateResponse(iRequest, ErrorCore.ERR_Timeout);
+                    IResponse response = MessageHelper.CreateResponse(requestType, rpcId, ErrorCore.ERR_Timeout);
                     action.SetResult(response);
                 }
             }
@@ -194,7 +194,7 @@ namespace ET
             long costTime = endTime - beginTime;
             if (costTime > 200)
             {
-                Log.Warning($"actor rpc time > 200: {costTime} {iRequest}");
+                Log.Warning($"actor rpc time > 200: {costTime} {requestType.FullName}");
             }
             
             return response;

+ 1 - 1
Unity/Assets/Scripts/Hotfix/Client/Demo/NetClient/Ping/PingComponentSystem.cs

@@ -34,7 +34,7 @@ namespace ET.Client
                     }
                     long time1 = TimeInfo.Instance.ClientNow();
                     // C2G_Ping不需要调用dispose,Call中会判断,如果用了对象池会自动回收
-                    C2G_Ping c2GPing = C2G_Ping.Create();
+                    C2G_Ping c2GPing = C2G_Ping.Create(true);
                     // 这里response要用using才能回收到池,默认不回收
                     using G2C_Ping response = await session.Call(c2GPing) as G2C_Ping;
 

+ 0 - 1
Unity/Assets/Scripts/Hotfix/Client/LockStep/Room2C_CheckHashFailHandler.cs

@@ -18,7 +18,6 @@ namespace ET.Client
                 Log.Debug($"check hash fail, client: {message.Frame} {clientWorld.ToJson()}");
             }
             
-            message.Dispose();
             await ETTask.CompletedTask;
         }
     }

+ 0 - 2
Unity/Assets/Scripts/Hotfix/Server/Module/ActorLocation/MessageLocationHandler.cs

@@ -9,7 +9,6 @@ namespace ET.Server
 
         public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
         {
-            using MessageObject _ = actorMessage;
             Fiber fiber = entity.Fiber();
             if (actorMessage is not Message message)
             {
@@ -52,7 +51,6 @@ namespace ET.Server
         {
             try
             {
-                using MessageObject _ = actorMessage;
                 Fiber fiber = entity.Fiber();
                 if (actorMessage is not Request request)
                 {

+ 8 - 7
Unity/Assets/Scripts/Hotfix/Server/Module/ActorLocation/MessageLocationSenderComponentSystem.cs

@@ -219,7 +219,8 @@ namespace ET.Server
             messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();
             
             Scene root = self.Root();
-            
+
+            Type requestType = iRequest.GetType();
             while (true)
             {
                 if (messageLocationSender.ActorId == default)
@@ -233,13 +234,13 @@ namespace ET.Server
 
                 if (messageLocationSender.ActorId == default)
                 {
-                    return MessageHelper.CreateResponse(iRequest, ErrorCore.ERR_NotFoundActor);
+                    return MessageHelper.CreateResponse(requestType, rpcId, ErrorCore.ERR_NotFoundActor);
                 }
                 IResponse response = await root.GetComponent<MessageSender>().Call(messageLocationSender.ActorId, rpcId, iRequest, needException: false);
                 
                 if (messageLocationSender.InstanceId != instanceId)
                 {
-                    throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout3, $"{iRequest}");
+                    throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout3, $"{requestType.FullName}");
                 }
                 
                 switch (response.Error)
@@ -250,7 +251,7 @@ namespace ET.Server
                         ++failTimes;
                         if (failTimes > 20)
                         {
-                            Log.Debug($"actor send message fail, actorid: {messageLocationSender.Id} {iRequest}");
+                            Log.Debug($"actor send message fail, actorid: {messageLocationSender.Id} {requestType.FullName}");
                             
                             // 这里删除actor,后面等待发送的消息会判断InstanceId,InstanceId不一致返回ERR_NotFoundActor
                             self.Remove(messageLocationSender.Id);
@@ -261,7 +262,7 @@ namespace ET.Server
                         await root.GetComponent<TimerComponent>().WaitAsync(500);
                         if (messageLocationSender.InstanceId != instanceId)
                         {
-                            throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout4, $"{iRequest}");
+                            throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout4, $"{requestType.FullName}");
                         }
 
                         messageLocationSender.ActorId = default;
@@ -269,13 +270,13 @@ namespace ET.Server
                     }
                     case ErrorCore.ERR_MessageTimeout:
                     {
-                        throw new RpcException(response.Error, $"{iRequest}");
+                        throw new RpcException(response.Error, $"{requestType.FullName}");
                     }
                 }
 
                 if (ErrorCore.IsRpcNeedThrowException(response.Error))
                 {
-                    throw new RpcException(response.Error, $"Message: {response.Message} Request: {iRequest}");
+                    throw new RpcException(response.Error, $"Message: {response.Message} Request: {requestType.FullName}");
                 }
 
                 return response;

+ 7 - 6
Unity/Assets/Scripts/Hotfix/Server/Module/Message/ProcessOuterSenderSystem.cs

@@ -152,13 +152,13 @@ namespace ET.Server
         {
             if (response.Error == ErrorCore.ERR_MessageTimeout)
             {
-                self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.Request}, response: {response}"));
+                self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.RequestType.FullName}, response: {response}"));
                 return;
             }
 
             if (self.NeedException && ErrorCore.IsRpcNeedThrowException(response.Error))
             {
-                self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.Request}, response: {response}"));
+                self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.RequestType.FullName}, response: {response}"));
                 return;
             }
 
@@ -207,7 +207,8 @@ namespace ET.Server
 
             iRequest.RpcId = rpcId;
 
-            MessageSenderStruct messageSenderStruct = new(actorId, iRequest, needException);
+            Type requestType = iRequest.GetType();
+            MessageSenderStruct messageSenderStruct = new(actorId, requestType, needException);
             self.requestCallback.Add(rpcId, messageSenderStruct);
             
             self.SendInner(actorId, iRequest as MessageObject);
@@ -222,11 +223,11 @@ namespace ET.Server
                 
                 if (needException)
                 {
-                    action.SetException(new Exception($"actor sender timeout: {iRequest}"));
+                    action.SetException(new Exception($"actor sender timeout: {requestType.FullName}"));
                 }
                 else
                 {
-                    IResponse response = ET.MessageHelper.CreateResponse(iRequest, ErrorCore.ERR_Timeout);
+                    IResponse response = MessageHelper.CreateResponse(requestType, rpcId, ErrorCore.ERR_Timeout);
                     action.SetResult(response);
                 }
             }
@@ -242,7 +243,7 @@ namespace ET.Server
             long costTime = endTime - beginTime;
             if (costTime > 200)
             {
-                Log.Warning($"actor rpc time > 200: {costTime} {iRequest}");
+                Log.Warning($"actor rpc time > 200: {costTime} {requestType.FullName}");
             }
 
             return response;

+ 1 - 3
Unity/Assets/Scripts/Hotfix/Share/Module/Actor/MailBoxType_OrderedMessageHandler.cs

@@ -17,7 +17,6 @@
             Fiber fiber = mailBoxComponent.Fiber();
             if (fiber.IsDisposed)
             {
-				messageObject.Dispose();
                 return;
             }
 
@@ -28,10 +27,9 @@
                 {
                     if (messageObject is IRequest request)
                     {
-                        IResponse resp = MessageHelper.CreateResponse(request, ErrorCore.ERR_NotFoundActor);
+                        IResponse resp = MessageHelper.CreateResponse(request.GetType(), request.RpcId, ErrorCore.ERR_NotFoundActor);
                         mailBoxComponent.Root().GetComponent<ProcessInnerSender>().Reply(args.FromAddress, resp);
                     }
-                    messageObject.Dispose();
                     return;
                 }
                 await MessageDispatcher.Instance.Handle(mailBoxComponent.Parent, args.FromAddress, messageObject);

+ 0 - 2
Unity/Assets/Scripts/Hotfix/Share/Module/Actor/MessageHandler.cs

@@ -9,7 +9,6 @@ namespace ET
 
         public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
         {
-            using MessageObject _ = actorMessage;
             if (actorMessage is not Message msg)
             {
                 Log.Error($"消息类型转换错误: {actorMessage.GetType().FullName} to {typeof (Message).Name}");
@@ -52,7 +51,6 @@ namespace ET
         {
             try
             {
-                using MessageObject _ = actorMessage;
                 Fiber fiber = entity.Fiber();
                 if (actorMessage is not Request request)
                 {

+ 5 - 5
Unity/Assets/Scripts/Model/Share/Module/Message/MessageSessionHandler.cs

@@ -13,20 +13,19 @@ namespace ET
 
         private async ETTask HandleAsync(Session session, object message)
         {
-            using Message msg = message as Message;
             if (message == null)
             {
-                Log.Error($"消息类型转换错误: {msg.GetType().FullName} to {typeof (Message).Name}");
+                Log.Error($"消息类型转换错误: {message.GetType().FullName} to {typeof (Message).Name}");
                 return;
             }
 
             if (session.IsDisposed)
             {
-                Log.Error($"session disconnect {msg}");
+                Log.Error($"session disconnect {message}");
                 return;
             }
 
-            await this.Run(session, msg);
+            await this.Run(session, (Message)message);
         }
 
         public Type GetMessageType()
@@ -54,7 +53,7 @@ namespace ET
         {
             try
             {
-                using Request request = message as Request;
+                Request request = message as Request;
                 if (request == null)
                 {
                     throw new Exception($"消息类型转换错误: {message.GetType().FullName} to {typeof (Request).FullName}");
@@ -63,6 +62,7 @@ namespace ET
                 int rpcId = request.RpcId;
                 long instanceId = session.InstanceId;
 
+                // 这里用using很安全,因为后面是session发送出去了
                 using Response response = ObjectPool.Instance.Fetch<Response>();
                 try
                 {

+ 8 - 22
Unity/Assets/Scripts/Model/Share/Module/Message/Session.cs

@@ -7,38 +7,24 @@ namespace ET
 {
     public readonly struct RpcInfo
     {
-        public IRequest Request { get; }
+        public Type RequestType { get; }
         
         private readonly ETTask<IResponse> tcs;
 
-        private readonly bool isFromPool;
-
-        public RpcInfo(IRequest request)
+        public RpcInfo(Type requestType)
         {
-            this.Request = request;
+            this.RequestType = requestType;
             
-            MessageObject messageObject = (MessageObject)this.Request;
-            this.isFromPool = messageObject.IsFromPool;
-            messageObject.IsFromPool = false;
-
             this.tcs = ETTask<IResponse>.Create(true);
         }
 
         public void SetResult(IResponse response)
         {
-            MessageObject messageObject = (MessageObject)this.Request;
-            messageObject.IsFromPool = this.isFromPool;
-            messageObject.Dispose();
-
             this.tcs.SetResult(response);
         }
 
         public void SetException(Exception exception)
         {
-            MessageObject messageObject = (MessageObject)this.Request;
-            messageObject.IsFromPool = this.isFromPool;
-            messageObject.Dispose();
-
             this.tcs.SetException(exception);
         }
 
@@ -82,7 +68,7 @@ namespace ET
         
         public static void OnResponse(this Session self, IResponse response)
         {
-            if (!self.requestCallbacks.Remove(response.RpcId, out var action))
+            if (!self.requestCallbacks.Remove(response.RpcId, out RpcInfo action))
             {
                 return;
             }
@@ -92,7 +78,7 @@ namespace ET
         public static async ETTask<IResponse> Call(this Session self, IRequest request, ETCancellationToken cancellationToken)
         {
             int rpcId = ++self.RpcId;
-            RpcInfo rpcInfo = new(request);
+            RpcInfo rpcInfo = new(request.GetType());
             self.requestCallbacks[rpcId] = rpcInfo;
             request.RpcId = rpcId;
 
@@ -105,7 +91,7 @@ namespace ET
                     return;
                 }
 
-                Type responseType = OpcodeType.Instance.GetResponseType(action.Request.GetType());
+                Type responseType = OpcodeType.Instance.GetResponseType(action.RequestType);
                 IResponse response = (IResponse) Activator.CreateInstance(responseType);
                 response.Error = ErrorCore.ERR_Cancel;
                 action.SetResult(response);
@@ -127,7 +113,7 @@ namespace ET
         public static async ETTask<IResponse> Call(this Session self, IRequest request, int time = 0)
         {
             int rpcId = ++self.RpcId;
-            RpcInfo rpcInfo = new(request);
+            RpcInfo rpcInfo = new(request.GetType());
             self.requestCallbacks[rpcId] = rpcInfo;
             request.RpcId = rpcId;
             self.Send(request);
@@ -147,7 +133,7 @@ namespace ET
                         return;
                     }
                     
-                    action.SetException(new Exception($"session call timeout: {request} {time}"));
+                    action.SetException(new Exception($"session call timeout: {action.RequestType.FullName} {time}"));
                 }
                 
                 Timeout().Coroutine();