ソースを参照

调整网络层MemoryBuffer池的处理方式,放回纤程,减少锁竞争

tanghai 2 年 前
コミット
b7811ef011

+ 37 - 16
Unity/Assets/Scripts/Core/Network/AService.cs

@@ -1,4 +1,5 @@
 using System;
+using System.Collections.Generic;
 using System.IO;
 using System.Net;
 
@@ -14,28 +15,48 @@ namespace ET
         
         public ServiceType ServiceType { get; protected set; }
         
-        private (MessageObject Message, MemoryBuffer MemoryStream) lastMessageInfo;
-        
-        // 缓存上一个发送的消息,这样广播消息的时候省掉多次序列化
-        public MemoryBuffer Fetch(MessageObject message)
+        public const int MaxCacheBufferSize = 1024;
+		
+        private readonly Queue<MemoryBuffer> pool = new();
+
+        public MemoryBuffer Fetch(int size = 0)
         {
-            // 这里虽然用了对象池,但是相邻的两个消息不会是池中来的同一个消息,因为lastMessageInfo中的消息还没回收
-            if (ReferenceEquals(message, this.lastMessageInfo.Message))
+            if (size > MaxCacheBufferSize)
             {
-                Log.Debug($"message serialize cache: {message.GetType().FullName}");
-                return lastMessageInfo.MemoryStream;
+                return new MemoryBuffer(size);
             }
             
-            // 回收上一个消息跟MemoryBuffer
-            ObjectPool.Instance.Recycle(this.lastMessageInfo.Message);
-            NetServices.Instance.RecycleMemoryBuffer(this.lastMessageInfo.MemoryStream);
-
-            MemoryBuffer stream = NetServices.Instance.FetchMemoryBuffer();
-            MessageSerializeHelper.MessageToStream(stream, message);
-            this.lastMessageInfo = (message, stream);
-            return stream;
+            if (size < MaxCacheBufferSize)
+            {
+                size = MaxCacheBufferSize;
+            }
+            
+            if (this.pool.Count == 0)
+            {
+                return new MemoryBuffer(size);
+            }
+            
+            return pool.Dequeue();
         }
 
+        public void Recycle(MemoryBuffer memoryBuffer)
+        {
+            if (memoryBuffer.Capacity > 1024)
+            {
+                return;
+            }
+            
+            if (this.pool.Count > 10) // 这里不需要太大,其实Kcp跟Tcp,这里1就足够了
+            {
+                return;
+            }
+            
+            memoryBuffer.Seek(0, SeekOrigin.Begin);
+            memoryBuffer.SetLength(0);
+            
+            this.pool.Enqueue(memoryBuffer);
+        }
+        
         public AService()
         {
             NetServices.Instance.Add(this);

+ 21 - 29
Unity/Assets/Scripts/Core/Network/KChannel.cs

@@ -7,21 +7,13 @@ using System.Runtime.InteropServices;
 
 namespace ET
 {
-
-	
-	public struct KcpWaitSendMessage
-	{
-		public ActorId ActorId;
-		public MessageObject Message;
-	}
-	
 	public class KChannel : AChannel
 	{
 		private readonly KService Service;
 
 		private Kcp kcp { get; set; }
 
-		private readonly Queue<KcpWaitSendMessage> waitSendMessages = new Queue<KcpWaitSendMessage>();
+		private readonly Queue<ActorMessageInfo> waitSendMessages = new Queue<ActorMessageInfo>();
 		
 		public readonly uint CreateTime;
 
@@ -44,10 +36,6 @@ namespace ET
 
 		public string RealAddress { get; set; }
 		
-		private const int maxPacketSize = 10000;
-
-		private readonly MemoryBuffer ms = new MemoryBuffer(maxPacketSize);
-
 		private MemoryBuffer readMemory;
 		private int needReadSplitCount;
 
@@ -177,8 +165,8 @@ namespace ET
 					break;
 				}
 				
-				KcpWaitSendMessage buffer = this.waitSendMessages.Dequeue();
-				this.Send(buffer.ActorId, buffer.Message);
+				ActorMessageInfo buffer = this.waitSendMessages.Dequeue();
+				this.Send(buffer.ActorId, buffer.MessageObject);
 			}
 		}
 
@@ -264,7 +252,7 @@ namespace ET
 			this.Service.AddToUpdate(nextUpdateTime, this.Id);
 		}
 
-		public void HandleRecv(byte[] date, int offset, int length)
+		public unsafe void HandleRecv(byte[] date, int offset, int length)
 		{
 			if (this.IsDisposed)
 			{
@@ -273,7 +261,6 @@ namespace ET
 
 			this.kcp.Input(date, offset, length);
 			this.Service.AddToUpdate(0, this.Id);
-
 			while (true)
 			{
 				if (this.IsDisposed)
@@ -318,11 +305,12 @@ namespace ET
 				}
 				else
 				{
-					this.readMemory = this.ms;
+					this.readMemory = this.Service.Fetch(n);
 					this.readMemory.SetLength(n);
 					this.readMemory.Seek(0, SeekOrigin.Begin);
 					
 					byte[] buffer = readMemory.GetBuffer();
+					
 					int count = this.kcp.Recv(buffer, 0, n);
 					if (n != count)
 					{
@@ -336,13 +324,12 @@ namespace ET
 						if (headInt == 0)
 						{
 							this.needReadSplitCount = BitConverter.ToInt32(readMemory.GetBuffer(), 4);
-							if (this.needReadSplitCount <= maxPacketSize)
+							if (this.needReadSplitCount <= AService.MaxCacheBufferSize)
 							{
 								Log.Error($"kchannel read error3: {this.needReadSplitCount} {this.LocalConn} {this.RemoteConn}");
 								this.OnError(ErrorCore.ERR_KcpSplitCountError);
 								return;
 							}
-							this.readMemory = new MemoryBuffer(this.needReadSplitCount);
 							this.readMemory.SetLength(this.needReadSplitCount);
 							this.readMemory.Seek(0, SeekOrigin.Begin);
 							continue;
@@ -360,9 +347,10 @@ namespace ET
 						this.readMemory.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
 						break;
 				}
-				MemoryBuffer mem = this.readMemory;
+				MemoryBuffer memoryBuffer = this.readMemory;
 				this.readMemory = null;
-				this.OnRead(mem);
+				this.OnRead(memoryBuffer);
+				this.Service.Recycle(memoryBuffer);
 			}
 		}
 
@@ -424,7 +412,7 @@ namespace ET
 			int count = (int) (memoryStream.Length - memoryStream.Position);
 
 			// 超出maxPacketSize需要分片
-			if (count <= maxPacketSize)
+			if (count <= AService.MaxCacheBufferSize)
 			{
 				this.kcp.Send(memoryStream.GetBuffer(), (int)memoryStream.Position, count);
 			}
@@ -441,7 +429,7 @@ namespace ET
 				{
 					int leftCount = count - alreadySendCount;
 					
-					int sendCount = leftCount < maxPacketSize? leftCount: maxPacketSize;
+					int sendCount = leftCount < AService.MaxCacheBufferSize? leftCount: AService.MaxCacheBufferSize;
 					
 					this.kcp.Send(memoryStream.GetBuffer(), (int)memoryStream.Position + alreadySendCount, sendCount);
 					
@@ -456,12 +444,14 @@ namespace ET
 		{
 			if (!this.IsConnected)
 			{
-				KcpWaitSendMessage kcpWaitSendMessage = new() { ActorId = actorId, Message = message };
-				this.waitSendMessages.Enqueue(kcpWaitSendMessage);
+				ActorMessageInfo actorMessageInfo = new() { ActorId = actorId, MessageObject = message };
+				this.waitSendMessages.Enqueue(actorMessageInfo);
 				return;
 			}
-			
-			MemoryBuffer memoryStream = this.Service.Fetch(message);
+
+			MemoryBuffer stream = this.Service.Fetch();
+			MessageSerializeHelper.MessageToStream(stream, message);
+			message.Dispose();
 
 			if (this.kcp == null)
 			{
@@ -490,7 +480,9 @@ namespace ET
 				return;
 			}
 
-			this.KcpSend(actorId, memoryStream);
+			this.KcpSend(actorId, stream);
+			
+			this.Service.Recycle(stream);
 		}
 		
 		private void OnRead(MemoryBuffer memoryStream)

+ 1 - 44
Unity/Assets/Scripts/Core/Network/NetServices.cs

@@ -1,8 +1,5 @@
-using System;
-using System.Collections.Concurrent;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
-using System.IO;
-using System.Net;
 using System.Threading;
 
 namespace ET
@@ -18,8 +15,6 @@ namespace ET
     {
         private readonly ConcurrentDictionary<long, AService> services = new();
         
-        private readonly ConcurrentQueue<MemoryBuffer> pool = new();
-
         private long idGenerator;
 
         public override void Dispose()
@@ -36,44 +31,6 @@ namespace ET
                 kv.Value.Dispose();
             }
         }
-     
-        public MemoryBuffer FetchMemoryBuffer()
-        {
-            MemoryBuffer memoryBuffer;
-            if (this.pool.TryDequeue(out memoryBuffer))
-            {
-                return memoryBuffer;
-            }
-
-            memoryBuffer = new(128) { IsFromPool = true };
-            return memoryBuffer;
-        }
-
-        public void RecycleMemoryBuffer(MemoryBuffer memoryBuffer)
-        {
-            if (memoryBuffer == null)
-            {
-                return;
-            }
-            
-            if (!memoryBuffer.IsFromPool)
-            {
-                return;
-            }
-            if (memoryBuffer.Capacity > 128) // 太大的不回收,GC
-            {
-                return;
-            }
-
-            if (this.pool.Count > 1000)
-            {
-                return;
-            }
-
-            memoryBuffer.SetLength(0);
-            memoryBuffer.Seek(0, SeekOrigin.Begin);
-            this.pool.Enqueue(memoryBuffer);
-        }
 
         public void Add(AService aService)
         {

+ 10 - 8
Unity/Assets/Scripts/Core/Network/PacketParser.cs

@@ -14,11 +14,11 @@ namespace ET
 		private readonly CircularBuffer buffer;
 		private int packetSize;
 		private ParserState state;
-		public AService service;
+		private readonly AService service;
 		private readonly byte[] cache = new byte[8];
 		public const int InnerPacketSizeLength = 4;
 		public const int OuterPacketSizeLength = 2;
-		public MemoryBuffer MemoryStream;
+		public MemoryBuffer MemoryBuffer;
 
 		public PacketParser(CircularBuffer buffer, AService service)
 		{
@@ -26,7 +26,7 @@ namespace ET
 			this.service = service;
 		}
 
-		public bool Parse()
+		public bool Parse(out MemoryBuffer memoryBuffer)
 		{
 			while (true)
 			{
@@ -38,6 +38,7 @@ namespace ET
 						{
 							if (this.buffer.Length < InnerPacketSizeLength)
 							{
+								memoryBuffer = null;
 								return false;
 							}
 
@@ -53,6 +54,7 @@ namespace ET
 						{
 							if (this.buffer.Length < OuterPacketSizeLength)
 							{
+								memoryBuffer = null;
 								return false;
 							}
 
@@ -72,21 +74,21 @@ namespace ET
 					{
 						if (this.buffer.Length < this.packetSize)
 						{
+							memoryBuffer = null;
 							return false;
 						}
 
-						MemoryBuffer memoryStream = new MemoryBuffer(this.packetSize);
-						this.buffer.Read(memoryStream, this.packetSize);
+						memoryBuffer = this.service.Fetch(this.packetSize);
+						this.buffer.Read(memoryBuffer, this.packetSize);
 						//memoryStream.SetLength(this.packetSize - Packet.MessageIndex);
-						this.MemoryStream = memoryStream;
 
 						if (this.service.ServiceType == ServiceType.Inner)
 						{
-							memoryStream.Seek(Packet.MessageIndex, SeekOrigin.Begin);
+							memoryBuffer.Seek(Packet.MessageIndex, SeekOrigin.Begin);
 						}
 						else
 						{
-							memoryStream.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
+							memoryBuffer.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
 						}
 
 						this.state = ParserState.PacketSize;

+ 14 - 8
Unity/Assets/Scripts/Core/Network/TChannel.cs

@@ -12,11 +12,11 @@ namespace ET
 	{
 		private readonly TService Service;
 		private Socket socket;
-		private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
-		private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
+		private SocketAsyncEventArgs innArgs = new();
+		private SocketAsyncEventArgs outArgs = new();
 
-		private readonly CircularBuffer recvBuffer = new CircularBuffer();
-		private readonly CircularBuffer sendBuffer = new CircularBuffer();
+		private readonly CircularBuffer recvBuffer = new();
+		private readonly CircularBuffer sendBuffer = new();
 
 		private bool isSending;
 
@@ -98,8 +98,10 @@ namespace ET
 			{
 				throw new Exception("TChannel已经被Dispose, 不能发送消息");
 			}
-			
-			MemoryBuffer stream = this.Service.Fetch(message);
+
+			MemoryBuffer stream = this.Service.Fetch();
+			MessageSerializeHelper.MessageToStream(stream, message);
+			message.Dispose();
 
 			switch (this.Service.ServiceType)
 			{
@@ -137,6 +139,8 @@ namespace ET
 				//this.StartSend();
 				this.Service.Queue.Enqueue(new TArgs() { Op = TcpOp.StartSend, ChannelId = this.Id});
 			}
+			
+			this.Service.Recycle(stream);
 		}
 
 		public void ConnectAsync()
@@ -250,13 +254,15 @@ namespace ET
 				}
 				try
 				{
-					bool ret = this.parser.Parse();
+					bool ret = this.parser.Parse(out MemoryBuffer memoryBuffer);
 					if (!ret)
 					{
 						break;
 					}
 					
-					this.OnRead(this.parser.MemoryStream);
+					this.OnRead(memoryBuffer);
+					
+					this.Service.Recycle(memoryBuffer);
 				}
 				catch (Exception ee)
 				{

+ 28 - 24
Unity/Assets/Scripts/Core/Network/WChannel.cs

@@ -15,13 +15,11 @@ namespace ET
 
         private readonly WebSocket webSocket;
 
-        private readonly Queue<MemoryBuffer> queue = new();
+        private readonly Queue<MessageObject> queue = new();
 
         private bool isSending;
 
         private bool isConnected;
-
-        private readonly MemoryBuffer recvStream;
         
         public IPEndPoint RemoteAddress { get; set; }
 
@@ -34,7 +32,6 @@ namespace ET
             this.ChannelType = ChannelType.Accept;
             this.WebSocketContext = webSocketContext;
             this.webSocket = webSocketContext.WebSocket;
-            this.recvStream = new MemoryBuffer(ushort.MaxValue);
 
             isConnected = true;
             
@@ -51,7 +48,6 @@ namespace ET
             this.Service = service;
             this.ChannelType = ChannelType.Connect;
             this.webSocket = webSocket;
-            this.recvStream = new MemoryBuffer(ushort.MaxValue);
 
             isConnected = false;
             
@@ -91,18 +87,7 @@ namespace ET
 
         public void Send(MessageObject message)
         {
-            MemoryBuffer stream = this.Service.Fetch(message);
-            
-            switch (this.Service.ServiceType)
-            {
-                case ServiceType.Inner:
-                    break;
-                case ServiceType.Outer:
-                    stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
-                    break;
-            }
-            
-            this.queue.Enqueue(stream);
+            this.queue.Enqueue(message);
 
             if (this.isConnected)
             {
@@ -134,10 +119,27 @@ namespace ET
                         return;
                     }
 
-                    MemoryBuffer bytes = this.queue.Dequeue();
+                    MessageObject message = this.queue.Dequeue();
+
+                    MemoryBuffer stream = this.Service.Fetch();
+                    
+                    MessageSerializeHelper.MessageToStream(stream, message);
+                    message.Dispose();
+            
+                    switch (this.Service.ServiceType)
+                    {
+                        case ServiceType.Inner:
+                            break;
+                        case ServiceType.Outer:
+                            stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin);
+                            break;
+                    }
+                    
                     try
                     {
-                        await this.webSocket.SendAsync(bytes.GetMemory(), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
+                        await this.webSocket.SendAsync(stream.GetMemory(), WebSocketMessageType.Binary, true, cancellationTokenSource.Token);
+                        
+                        this.Service.Recycle(stream);
                         
                         if (this.IsDisposed)
                         {
@@ -200,11 +202,13 @@ namespace ET
                         this.OnError(ErrorCore.ERR_WebsocketMessageTooBig);
                         return;
                     }
-                    
-                    this.recvStream.SetLength(receiveCount);
-                    this.recvStream.Seek(2, SeekOrigin.Begin);
-                    Array.Copy(this.cache, 0, this.recvStream.GetBuffer(), 0, receiveCount);
-                    this.OnRead(this.recvStream);
+
+                    MemoryBuffer memoryBuffer = this.Service.Fetch(receiveCount);
+                    memoryBuffer.SetLength(receiveCount);
+                    memoryBuffer.Seek(2, SeekOrigin.Begin);
+                    Array.Copy(this.cache, 0, memoryBuffer.GetBuffer(), 0, receiveCount);
+                    this.OnRead(memoryBuffer);
+                    this.Service.Recycle(memoryBuffer);
                 }
             }
             catch (Exception e)

+ 0 - 7
Unity/Assets/Scripts/Core/Serialize/MemoryBuffer.cs

@@ -1,14 +1,12 @@
 using System;
 using System.Buffers;
 using System.IO;
-using MemoryPack;
 
 namespace ET
 {
     public class MemoryBuffer: MemoryStream, IBufferWriter<byte>
     {
         private int origin;
-        public bool IsFromPool;
         
         public MemoryBuffer()
         {
@@ -26,11 +24,6 @@ namespace ET
         {
             this.origin = index;
         }
-
-        protected override void Dispose(bool disposing)
-        {
-            this.Seek(0, SeekOrigin.Begin);
-        }
         
         public ReadOnlyMemory<byte> WrittenMemory => this.GetBuffer().AsMemory(this.origin, (int)this.Position);
 

+ 1 - 1
Unity/Assets/Scripts/Core/World/Module/Actor/ActorMessageDispatcherComponent.cs

@@ -90,7 +90,7 @@ namespace ET
             this.ActorMessageHandlers[type].Add(handler);
         }
 
-        public async ETTask Handle(Entity entity, Address fromAddress, object message)
+        public async ETTask Handle(Entity entity, Address fromAddress, MessageObject message)
         {
             List<ActorMessageDispatcherInfo> list;
             if (!this.ActorMessageHandlers.TryGetValue(message.GetType(), out list))

+ 1 - 1
Unity/Assets/Scripts/Core/World/Module/Actor/IMActorHandler.cs

@@ -4,7 +4,7 @@ namespace ET
 {
     public interface IMActorHandler
     {
-        ETTask Handle(Entity entity, Address fromAddress, object actorMessage);
+        ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage);
         Type GetRequestType();
         Type GetResponseType();
     }

+ 4 - 1
Unity/Assets/Scripts/Hotfix/Server/Demo/Gate/MailBoxType_GateSessionHandler.cs

@@ -6,10 +6,13 @@
         public override void Handle(MailBoxInvoker args)
         {
             MailBoxComponent mailBoxComponent = args.MailBoxComponent;
+            
+            // 这里messageObject要发送出去,不能回收
             MessageObject messageObject = args.MessageObject;
+            
             if (mailBoxComponent.Parent is PlayerSessionComponent playerSessionComponent)
             {
-                playerSessionComponent.Session?.Send(messageObject as IMessage);
+                playerSessionComponent.Session?.Send(messageObject);
             }
         }
     }

+ 2 - 2
Unity/Assets/Scripts/Hotfix/Server/Module/ActorLocation/ActorMessageLocationHandler.cs

@@ -7,7 +7,7 @@ namespace ET.Server
     {
         protected abstract ETTask Run(E entity, Message message);
 
-        public async ETTask Handle(Entity entity, Address fromAddress, object actorMessage)
+        public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
         {
             if (actorMessage is not Message message)
             {
@@ -45,7 +45,7 @@ namespace ET.Server
     {
         protected abstract ETTask Run(E unit, Request request, Response response);
 
-        public async ETTask Handle(Entity entity, Address fromAddress, object actorMessage)
+        public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
         {
             try
             {

+ 2 - 2
Unity/Assets/Scripts/Hotfix/Share/Module/Actor/ActorMessageHandler.cs

@@ -8,7 +8,7 @@ namespace ET
     {
         protected abstract ETTask Run(E entity, Message message);
 
-        public async ETTask Handle(Entity entity, Address fromAddress, object actorMessage)
+        public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
         {
             if (actorMessage is not Message msg)
             {
@@ -48,7 +48,7 @@ namespace ET
     {
         protected abstract ETTask Run(E unit, Request request, Response response);
 
-        public async ETTask Handle(Entity entity, Address fromAddress, object actorMessage)
+        public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
         {
             try
             {

+ 6 - 2
Unity/Assets/Scripts/Hotfix/Share/Module/Actor/ActorSenderComponentSystem.cs

@@ -110,11 +110,15 @@ namespace ET
             // 如果发向同一个进程,则扔到消息队列中
             if (actorId.Process == fiber.Process)
             {
-                ActorMessageQueue.Instance.Send(self.Fiber().Address, actorId, message);
+                ActorMessageQueue.Instance.Send(fiber.Address, actorId, message);
                 return;
             }
 
-            A2NetInner_Message netInnerMessage = new() { FromAddress = fiber.Address, ActorId = actorId, MessageObject = message };
+
+            A2NetInner_Message netInnerMessage = A2NetInner_Message.Create(true);
+            netInnerMessage.FromAddress = fiber.Address;
+            netInnerMessage.ActorId = actorId;
+            netInnerMessage.MessageObject = message;
             // 扔到NetInner纤程
             ActorMessageQueue.Instance.Send(new ActorId(actorId.Process, ConstFiberId.NetInner), netInnerMessage);
         }

+ 5 - 2
Unity/Assets/Scripts/Hotfix/Share/Module/Actor/MailBoxType_OrderedMessageHandler.cs

@@ -8,10 +8,13 @@
             HandleInner(args).Coroutine();
         }
 
-        private async ETTask HandleInner(MailBoxInvoker args)
+        private static async ETTask HandleInner(MailBoxInvoker args)
         {
             MailBoxComponent mailBoxComponent = args.MailBoxComponent;
-            MessageObject messageObject = args.MessageObject;
+            
+            // 对象池回收
+            using MessageObject messageObject = args.MessageObject;
+            
             CoroutineLockComponent coroutineLockComponent = mailBoxComponent.CoroutineLockComponent;
             if (coroutineLockComponent == null)
             {

+ 9 - 2
Unity/Assets/Scripts/Hotfix/Share/Module/Actor/MailBoxType_UnOrderedMessageHandler.cs

@@ -4,16 +4,23 @@
     public class MailBoxType_UnOrderedMessageHandler: AInvokeHandler<MailBoxInvoker>
     {
         public override void Handle(MailBoxInvoker args)
+        {
+            HandleAsync(args).Coroutine();
+        }
+        
+        private static async ETTask HandleAsync(MailBoxInvoker args)
         {
             MailBoxComponent mailBoxComponent = args.MailBoxComponent;
-            MessageObject messageObject = args.MessageObject;
+            
+            using MessageObject messageObject = args.MessageObject;
+            
             CoroutineLockComponent coroutineLockComponent = mailBoxComponent.CoroutineLockComponent;
             if (coroutineLockComponent == null)
             {
                 return;
             }
 
-            ActorMessageDispatcherComponent.Instance.Handle(mailBoxComponent.Parent, args.FromAddress, messageObject).Coroutine();
+            await ActorMessageDispatcherComponent.Instance.Handle(mailBoxComponent.Parent, args.FromAddress, messageObject);
         }
     }
 }

+ 4 - 0
Unity/Assets/Scripts/Model/Share/Module/Actor/ActorSenderComponent.cs

@@ -4,6 +4,10 @@ namespace ET
 {
     public class A2NetInner_Message: MessageObject, IActorMessage
     {
+        public static A2NetInner_Message Create(bool isFromPool = false) { return !isFromPool? new A2NetInner_Message() : ObjectPool.Instance.Fetch(typeof(A2NetInner_Message)) as A2NetInner_Message; }
+
+        public override void Dispose() { ObjectPool.Instance.Recycle(this); }
+        
         public Address FromAddress;
         public ActorId ActorId;
         public MessageObject MessageObject;