Просмотр исходного кода

1 kcp大消息分片支持,支持int长度的消息
2 目前只编译了mac版的kcp库,windows跟linux需要自己编译
3 去掉多线程支持,kcp多线程准备重新做

tanghai 4 лет назад
Родитель
Сommit
84fde6e34e

+ 2 - 1
Libs/Kcp/ikcp.c

@@ -395,7 +395,7 @@ void ikcp_setlog(void(*op)(const char *buf, int len, ikcpcb *kcp, void *user))
 //---------------------------------------------------------------------
 // user/upper level recv: returns size, returns below zero for EAGAIN
 //---------------------------------------------------------------------
-int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
+int ikcp_recv(ikcpcb *kcp, char *buffer, int index, int len)
 {
 	struct IQUEUEHEAD *p;
 	int ispeek = (len < 0)? 1 : 0;
@@ -403,6 +403,7 @@ int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
 	int recover = 0;
 	IKCPSEG *seg;
 	assert(kcp);
+	buffer += index;
 
 	if (iqueue_is_empty(&kcp->rcv_queue))
 		return -1;

+ 1 - 1
Libs/Kcp/ikcp.h

@@ -362,7 +362,7 @@ extern "C" {
 	KCPDLL void ikcp_setlog(void(*writelog)(const char *buf, int len, ikcpcb *kcp, void *user));
 
 // user/upper level recv: returns size, returns below zero for EAGAIN
-	KCPDLL int ikcp_recv(ikcpcb *kcp, char *buffer, int len);
+	KCPDLL int ikcp_recv(ikcpcb *kcp, char *buffer, int index, int len);
 
 // user/upper level send, returns below zero for error
 	KCPDLL int ikcp_send(ikcpcb *kcp, const char *buffer, int offset, int len);

+ 1 - 1
Server/Hotfix/Module/Message/NetInnerComponentSystem.cs

@@ -95,7 +95,7 @@ namespace ET
         // 这个channelId是由CreateConnectChannelId生成的
         public static Session Create(this NetInnerComponent self, IPEndPoint ipEndPoint)
         {
-            uint localConn = self.Service.CreateRandomLocalConn(self.Random);
+            uint localConn = self.Service.CreateRandomLocalConn();
             long channelId = self.Service.CreateConnectChannelId(localConn);
             Session session = self.CreateInner(channelId, ipEndPoint);
             return session;

+ 0 - 2
Server/Model/Module/Message/NetInnerComponent.cs

@@ -25,7 +25,5 @@ namespace ET
         public static NetInnerComponent Instance;
 
         public IMessageDispatcher MessageDispatcher { get; set; }
-        
-        public Random Random = new Random(Guid.NewGuid().GetHashCode());
     }
 }

+ 1 - 1
Unity/Assets/Hotfix/Module/Message/NetKcpComponentSystem.cs

@@ -1,4 +1,4 @@
-using System;
+using System;
 using System.IO;
 using System.Net;
 

+ 0 - 46
Unity/Assets/Hotfix/Module/Message/NetThreadComponentSystem.cs

@@ -10,17 +10,10 @@ namespace ET
         {
             NetThreadComponent.Instance = self;
             
-#if NET_THREAD
-            self.Thread = new Thread(self.Loop);
-            self.ThreadSynchronizationContext = new ThreadSynchronizationContext(self.Thread.ManagedThreadId);
-            self.Thread.Start();
-#else
             self.ThreadSynchronizationContext = ThreadSynchronizationContext.Instance;
-#endif
         }
     }
 
-#if !NET_THREAD
     [ObjectSystem]
     public class NetThreadComponentUpdateSystem: LateUpdateSystem<NetThreadComponent>
     {
@@ -32,7 +25,6 @@ namespace ET
             }
         }
     }
-#endif
     
     [ObjectSystem]
     public class NetThreadComponentDestroySystem: DestroySystem<NetThreadComponent>
@@ -45,13 +37,9 @@ namespace ET
     
     public static class NetThreadComponentSystem
     {
-#region 主线程
 
         public static void Stop(this NetThreadComponent self)
         {
-#if NET_THREAD
-            self.ThreadSynchronizationContext.Post(()=>{self.isRun = false;});
-#endif
         }
 
         public static void Add(this NetThreadComponent self, AService kService)
@@ -79,40 +67,6 @@ namespace ET
                 self.Services.Remove(kService);
             });
         }
-
-#endregion
-
-#if NET_THREAD
-#region 网络线程
-        public static void Loop(this NetThreadComponent self)
-        {
-            self.isRun = true;
-            while (true)
-            {
-                try
-                {
-                    if (!self.isRun)
-                    {
-                        return;
-                    }
-
-                    self.ThreadSynchronizationContext.Update();
-
-                    foreach (AService service in self.Services)
-                    {
-                        service.Update();
-                    }
-                }
-                catch (Exception e)
-                {
-                    Log.Error(e);
-                }
-                
-                Thread.Sleep(1);
-            }
-        }
-#endregion
-#endif
         
     }
 }

+ 1 - 56
Unity/Assets/Model/Module/CoroutineLock/CoroutineLockType.cs

@@ -6,64 +6,9 @@ namespace ET
         Location,                  // location进程上使用
         ActorLocationSender,       // ActorLocationSender中队列消息 
         Mailbox,                   // Mailbox中队列
-        AccountName,               // Realm上验证账号时使用
-        GateAccountLock,           // Gate上登陆账号时使用
-        LockPlayerName,            // 锁定玩家角色名字
         UnitId,                    // Map服务器上线下线时使用
-        GateOffline,               // Gate上下线用
-        SendMail,                  // 发送Mail时使用
-        DB,                        // 存储数据库
-        LevelSeal,                 //玩家请求是否达到等级封印时使用
-        ClientChangeScene,         // 客户端切换场景
-        GetCityUnit,               // 获得城池Unit
-        GetFamilyUnit,             // 获得家族Unit
-        GetFamilyEntity,           // 获得家族实体
-        LockFamilyName,            // 家族名字
-        EnterFamilyMainScene,      // 进入家族领地
-        EnterFamilyTrial,          // 进入家族试炼场
-        OpenTower,                 // 开启塔林
-        ZiJinKu,                   // 开启紫金窟
-        EnterCityBattlePreScene,   // 进入城战准备副本
-        TransferOtherCopy,         // 请求分配到一个新的战场
-        CityBattleAuction,         // 城战竞拍
-        CityBattleRecruitSubMoney, // 城战招募状态扣钱
-        JoinOrExitFamily,          // 玩家加入、退出、踢出家族
-        HitDevilPlayerData,        //棒打魔王结算增加烧饼,玩家登陆到棒打魔王服.
-        MedicalNumber,             //伏鼎牌照抽奖
-        GetExtraExpress,           // 领取快递
-        GetYinYangUnit,            // 获取阴阳玑Unit
-        GetFTUnitInfo,             // 获取好友组队服玩家信息
-        TuTeng,                    // 批量获取组合图腾
-        GetPlayerChat,             //获取聊天存储记录
-        LoadSystemComponent,       // 加载各个系统组件
-        ChargeByOid,               // 充值
-        UnitCache,                 // 查询缓存
-        QuestDrop,                 //任务掉落
-        Login,                     // 登陆时,反正访问数据库峰值太高的排队
-        CityBroadcast,             // 城池广播
-        HomeWorld,                 // 师傅世界服操作
-        GetCache,                  // 获取缓存
-        GetHomeUnit,               // 获取家园玩家
-        HandleFamilyEntity,        // 处理家族实体
-        AskScene,                  // 请求场景
-        GMService,
-        UnitCacheGet, // UnitCache查询组件
-        Auction,      // 拍卖
-        Match,
-        JGSettleRecord, // 九宫结算记录
-        AugurMgrBuyNumber,
-        GetTeaUnit,              // 获取茶楼Unit
-        AnimalCheck_LockRoom, // 锁住房间
-
-        // Client
+        DB,
         Resources,
-        ResourcesLoader,
-        GiveTax, // 给国库上税
-        
-        SceneMapResourcesLoader,
-        SceneMapResources,
-
-        ChangeModel,//切换模型频繁
 
         Max, // 这个必须在最后
     }

+ 4 - 0
Unity/Assets/Model/Module/Message/ErrorCode.cs

@@ -25,6 +25,10 @@ namespace ET
         public const int ERR_WebsocketPeerReset = 100218;
         public const int ERR_WebsocketMessageTooBig = 100219;
         public const int ERR_WebsocketRecvError = 100220;
+        
+        public const int ERR_KcpReadNotSame = 100230;
+        public const int ERR_KcpSplitError = 100231;
+        public const int ERR_KcpSplitCountError = 100232;
 
         public const int ERR_ActorNoMailBoxComponent = 110003;
         public const int ERR_ActorLocationSenderTimeout = 110004;

+ 0 - 9
Unity/Assets/Model/Module/Message/NetThreadComponent.cs

@@ -13,17 +13,8 @@ namespace ET
         public const int recvMaxIdleTime = 60000;
         public const int sendMaxIdleTime = 60000;
 
-#if NET_THREAD
-        public Thread Thread;
-#endif
         public ThreadSynchronizationContext ThreadSynchronizationContext;
         
         public HashSet<AService> Services = new HashSet<AService>();
-
-#if NET_THREAD
-        public bool isRun;
-#endif
-        
-        public Random Random = new Random(Guid.NewGuid().GetHashCode());
     }
 }

+ 2 - 57
Unity/Assets/Model/Module/Network/AService.cs

@@ -17,13 +17,11 @@ namespace ET
             return (--this.connectIdGenerater << 32) | localConn;
         }
         
-        public uint CreateRandomLocalConn(Random random)
+        public uint CreateRandomLocalConn()
         {
-            return (1u << 30) | random.RandUInt32();
+            return (1u << 30) | RandomHelper.RandUInt32();
         }
 
-#region 网络线程
-        
         // localConn放在低32bit
         private long acceptIdGenerater = 1;
         public long CreateAcceptChannelId(uint localConn)
@@ -47,46 +45,21 @@ namespace ET
         
         protected void OnAccept(long channelId, IPEndPoint ipEndPoint)
         {
-#if NET_THREAD
-            ThreadSynchronizationContext.Instance.Post(() =>
-            {
-                this.AcceptCallback.Invoke(channelId, ipEndPoint);
-            });
-#else
             this.AcceptCallback.Invoke(channelId, ipEndPoint);
-#endif
         }
 
         public void OnRead(long channelId, MemoryStream memoryStream)
         {
-#if NET_THREAD
-            ThreadSynchronizationContext.Instance.Post(() =>
-            {
-                this.ReadCallback.Invoke(channelId, memoryStream);
-            });
-#else
             this.ReadCallback.Invoke(channelId, memoryStream);
-#endif
         }
 
         public void OnError(long channelId, int e)
         {
             this.Remove(channelId);
             
-#if NET_THREAD
-            ThreadSynchronizationContext.Instance.Post(() =>
-            {
-                this.ErrorCallback?.Invoke(channelId, e);
-            });
-#else
             this.ErrorCallback?.Invoke(channelId, e);
-#endif
         }
 
-#endregion
-
-
-#region 主线程
         
         public Action<long, IPEndPoint> AcceptCallback;
         public Action<long, int> ErrorCallback;
@@ -94,50 +67,22 @@ namespace ET
 
         public void Destroy()
         {
-#if NET_THREAD
-            this.ThreadSynchronizationContext.Post(this.Dispose);
-#else
             this.Dispose();
-#endif
         }
 
         public void RemoveChannel(long channelId)
         {
-#if NET_THREAD
-            this.ThreadSynchronizationContext.Post(() =>
-            {
-                this.Remove(channelId);
-            });
-#else
             this.Remove(channelId);
-#endif
         }
 
         public void SendStream(long channelId, long actorId, MemoryStream stream)
         {
-#if NET_THREAD
-            this.ThreadSynchronizationContext.Post(() =>
-            {
-                this.Send(channelId, actorId, stream);
-            });
-#else
             this.Send(channelId, actorId, stream);
-#endif
         }
 
         public void GetOrCreate(long id, IPEndPoint address)
         {
-#if NET_THREAD
-            this.ThreadSynchronizationContext.Post(()=>
-            {
-                this.Get(id, address);
-            });
-#else
             this.Get(id, address);
-#endif
         }
-
-#endregion
-
     }
 }

+ 126 - 70
Unity/Assets/Model/Module/Network/KChannel.cs

@@ -1,5 +1,4 @@
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.IO;
 using System.Net;
@@ -16,16 +15,13 @@ namespace ET
 	
 	public class KChannel : AChannel
 	{
-		public KService Service;
-		
-		// 保存所有的channel
-		public static readonly Dictionary<uint, KChannel> kChannels = new Dictionary<uint, KChannel>();
+		public static readonly Dictionary<IntPtr, KChannel> KcpPtrChannels = new Dictionary<IntPtr, KChannel>();
 		
-		public static readonly ConcurrentDictionary<long, ulong> idLocalRemoteConn = new ConcurrentDictionary<long, ulong>();
+		public KService Service;
 		
 		private Socket socket;
 
-		private IntPtr kcp;
+		public IntPtr kcp { get; private set; }
 
 		private readonly Queue<KcpWaitPacket> sendBuffer = new Queue<KcpWaitPacket>();
 
@@ -36,27 +32,35 @@ namespace ET
 		public uint LocalConn { get; set; }
 		public uint RemoteConn { get; set; }
 
-		private readonly byte[] sendCache = new byte[1024 * 1024];
+		private readonly byte[] sendCache = new byte[2 * 1024];
 		
 		public bool IsConnected { get; private set; }
 
 		public string RealAddress { get; set; }
 		
+		private const int maxPacketSize = 10000;
+
+		private MemoryStream ms = new MemoryStream(maxPacketSize);
+
+		private MemoryStream readMemory;
+		private int needReadSplitCount;
+		
 		private void InitKcp()
 		{
+			KcpPtrChannels.Add(this.kcp, this);
 			switch (this.Service.ServiceType)
 			{
 				case ServiceType.Inner:
 					Kcp.KcpNodelay(kcp, 1, 10, 2, 1);
-					Kcp.KcpWndsize(kcp, 1024 * 100, 1024 * 100);
+					Kcp.KcpWndsize(kcp, ushort.MaxValue, ushort.MaxValue);
 					Kcp.KcpSetmtu(kcp, 1400); // 默认1400
-					Kcp.KcpSetminrto(kcp, 10);
+					Kcp.KcpSetminrto(kcp, 30);
 					break;
 				case ServiceType.Outer:
 					Kcp.KcpNodelay(kcp, 1, 10, 2, 1);
-					Kcp.KcpWndsize(kcp, 128, 128);
+					Kcp.KcpWndsize(kcp, 256, 256);
 					Kcp.KcpSetmtu(kcp, 470);
-					Kcp.KcpSetminrto(kcp, 10);
+					Kcp.KcpSetminrto(kcp, 30);
 					break;
 			}
 
@@ -66,23 +70,16 @@ namespace ET
 		public KChannel(long id, uint localConn, Socket socket, IPEndPoint remoteEndPoint, KService kService)
 		{
 			this.LocalConn = localConn;
-			if (kChannels.ContainsKey(this.LocalConn))
-			{
-				throw new Exception($"channel create error: {this.LocalConn} {remoteEndPoint} {this.ChannelType}");
-			}
 
 			this.Id = id;
 			this.ChannelType = ChannelType.Connect;
 			
 			Log.Info($"channel create: {this.Id} {this.LocalConn} {remoteEndPoint} {this.ChannelType}");
 			
+			this.kcp = IntPtr.Zero;
 			this.Service = kService;
 			this.RemoteAddress = remoteEndPoint;
 			this.socket = socket;
-			this.kcp = Kcp.KcpCreate(this.RemoteConn, (IntPtr) this.LocalConn);
-
-			kChannels.Add(this.LocalConn, this);
-			
 			this.lastRecvTime = kService.TimeNow;
 			this.CreateTime = kService.TimeNow;
 
@@ -93,11 +90,6 @@ namespace ET
 		// accept
 		public KChannel(long id, uint localConn, uint remoteConn, Socket socket, IPEndPoint remoteEndPoint, KService kService)
 		{
-			if (kChannels.ContainsKey(this.LocalConn))
-			{
-				throw new Exception($"channel create error: {localConn} {remoteEndPoint} {this.ChannelType}");
-			}
-
 			this.Id = id;
 			this.ChannelType = ChannelType.Accept;
 			
@@ -108,22 +100,13 @@ namespace ET
 			this.RemoteConn = remoteConn;
 			this.RemoteAddress = remoteEndPoint;
 			this.socket = socket;
-			this.kcp = Kcp.KcpCreate(this.RemoteConn, (IntPtr) localConn);
-
-			kChannels.Add(this.LocalConn, this);
+			this.kcp = Kcp.KcpCreate(this.RemoteConn, IntPtr.Zero);
+			this.InitKcp();
 			
 			this.lastRecvTime = kService.TimeNow;
 			this.CreateTime = kService.TimeNow;
-
-			this.InitKcp();
 		}
-
-		
-
-#region 网络线程
-
-		
-
+	
 
 		public override void Dispose()
 		{
@@ -136,9 +119,6 @@ namespace ET
 			uint remoteConn = this.RemoteConn;
 			Log.Info($"channel dispose: {this.Id} {localConn} {remoteConn}");
 			
-			kChannels.Remove(localConn);
-			idLocalRemoteConn.TryRemove(this.Id, out ulong _);
-			
 			long id = this.Id;
 			this.Id = 0;
 			this.Service.Remove(id);
@@ -155,6 +135,7 @@ namespace ET
 
 			if (this.kcp != IntPtr.Zero)
 			{
+				KcpPtrChannels.Remove(this.kcp);
 				Kcp.KcpRelease(this.kcp);
 				this.kcp = IntPtr.Zero;
 			}
@@ -170,12 +151,9 @@ namespace ET
 				return;
 			}
 
-			this.kcp = Kcp.KcpCreate(this.RemoteConn, new IntPtr(this.LocalConn));
+			this.kcp = Kcp.KcpCreate(this.RemoteConn, IntPtr.Zero);
 			this.InitKcp();
 
-			ulong localRmoteConn = ((ulong) this.RemoteConn << 32) | this.LocalConn;
-			idLocalRemoteConn.TryAdd(this.Id, localRmoteConn);
-
 			Log.Info($"channel connected: {this.Id} {this.LocalConn} {this.RemoteConn} {this.RemoteAddress}");
 			this.IsConnected = true;
 			this.lastRecvTime = this.Service.TimeNow;
@@ -209,7 +187,8 @@ namespace ET
 				buffer.WriteTo(5, this.RemoteConn);
 				this.socket.SendTo(buffer, 0, 9, SocketFlags.None, this.RemoteAddress);
 				Log.Info($"kchannel connect {this.Id} {this.LocalConn} {this.RemoteConn} {this.RealAddress} {this.socket.LocalEndPoint}");
-				// 200毫秒后再次update发送connect请求
+				
+				// 300毫秒后再次update发送connect请求
 				this.Service.AddToUpdateNextTime(timeNow + 300, this.Id);
 			}
 			catch (Exception e)
@@ -231,14 +210,13 @@ namespace ET
 			// 如果还没连接上,发送连接请求
 			if (!this.IsConnected)
 			{
-				// 20秒没连接上则报错
-				if (timeNow - this.CreateTime > 10 * 1000)
+				// 10秒超时没连接上则报错
+				if (timeNow - this.CreateTime > 10000)
 				{
 					Log.Error($"kChannel connect timeout: {this.Id} {this.RemoteConn} {timeNow} {this.CreateTime} {this.ChannelType} {this.RemoteAddress}");
 					this.OnError(ErrorCode.ERR_KcpConnectTimeout);
 					return;
 				}
-
 				switch (ChannelType)
 				{
 					case ChannelType.Connect:
@@ -248,6 +226,11 @@ namespace ET
 				return;
 			}
 
+			if (this.kcp == IntPtr.Zero)
+			{
+				return;
+			}
+			
 			try
 			{
 				Kcp.KcpUpdate(this.kcp, timeNow);
@@ -259,11 +242,8 @@ namespace ET
 				return;
 			}
 
-			if (this.kcp != IntPtr.Zero)
-			{
-				uint nextUpdateTime = Kcp.KcpCheck(this.kcp, timeNow);
-				this.Service.AddToUpdateNextTime(nextUpdateTime, this.Id);
-			}
+			uint nextUpdateTime = Kcp.KcpCheck(this.kcp, timeNow);
+			this.Service.AddToUpdateNextTime(nextUpdateTime, this.Id);
 		}
 
 		public void HandleRecv(byte[] date, int offset, int length)
@@ -292,31 +272,83 @@ namespace ET
 				if (n == 0)
 				{
 					this.OnError((int)SocketError.NetworkReset);
-					break;
+					return;
 				}
 
-				MemoryStream ms = MessageSerializeHelper.GetStream(n);
-				
-				ms.SetLength(n);
-				ms.Seek(0, SeekOrigin.Begin);
-				byte[] buffer = ms.GetBuffer();
-				int count = Kcp.KcpRecv(this.kcp, buffer, n);
-				if (n != count)
+
+				if (this.needReadSplitCount > 0) // 说明消息分片了
 				{
-					break;
+					byte[] buffer = readMemory.GetBuffer();
+					int count = Kcp.KcpRecv(this.kcp, buffer, (int)this.readMemory.Length - this.needReadSplitCount, n);
+					this.needReadSplitCount -= count;
+					if (n != count)
+					{
+						Log.Error($"kchannel read error1: {this.LocalConn} {this.RemoteConn}");
+						this.OnError(ErrorCode.ERR_KcpReadNotSame);
+						return;
+					}
+					
+					if (this.needReadSplitCount < 0)
+					{
+						Log.Error($"kchannel read error2: {this.LocalConn} {this.RemoteConn}");
+						this.OnError(ErrorCode.ERR_KcpSplitError);
+						return;
+					}
+										
+					// 没有读完
+					if (this.needReadSplitCount != 0)
+					{
+						continue;
+					}
 				}
+				else
+				{
+					this.readMemory = this.ms;
+					this.readMemory.SetLength(n);
+					this.readMemory.Seek(0, SeekOrigin.Begin);
+					
+					byte[] buffer = readMemory.GetBuffer();
+					int count = Kcp.KcpRecv(this.kcp, buffer, 0, n);
+					if (n != count)
+					{
+						break;
+					}
+					
+					// 判断是不是分片
+					if (n == 8)
+					{
+						int headInt = BitConverter.ToInt32(this.readMemory.GetBuffer(), 0);
+						if (headInt == 0)
+						{
+							this.needReadSplitCount = BitConverter.ToInt32(readMemory.GetBuffer(), 4);
+							if (this.needReadSplitCount <= maxPacketSize)
+							{
+								Log.Error($"kchannel read error3: {this.needReadSplitCount} {this.LocalConn} {this.RemoteConn}");
+								this.OnError(ErrorCode.ERR_KcpSplitCountError);
+								return;
+							}
+							this.readMemory = new MemoryStream(this.needReadSplitCount);
+							this.readMemory.SetLength(this.needReadSplitCount);
+							this.readMemory.Seek(0, SeekOrigin.Begin);
+							continue;
+						}
+					}
+				}
+
 
 				switch (this.Service.ServiceType)
 				{
 					case ServiceType.Inner:
-						ms.Seek(Packet.ActorIdLength + Packet.OpcodeLength, SeekOrigin.Begin);
+						this.readMemory.Seek(Packet.ActorIdLength + Packet.OpcodeLength, SeekOrigin.Begin);
 						break;
 					case ServiceType.Outer:
-						ms.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
+						this.readMemory.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
 						break;
 				}
 				this.lastRecvTime = this.Service.TimeNow;
-				this.OnRead(ms);
+				MemoryStream mem = this.readMemory;
+				this.readMemory = null;
+				this.OnRead(mem);
 			}
 		}
 
@@ -362,13 +394,39 @@ namespace ET
 			}
 
 			MemoryStream memoryStream = kcpWaitPacket.MemoryStream;
+			int count = (int) (memoryStream.Length - memoryStream.Position);
+			
 			if (this.Service.ServiceType == ServiceType.Inner)
 			{
 				memoryStream.GetBuffer().WriteTo(0, kcpWaitPacket.ActorId);
 			}
 
-			int count = (int) (memoryStream.Length - memoryStream.Position);
-			Kcp.KcpSend(this.kcp, memoryStream.GetBuffer(), (int)memoryStream.Position, count);
+			// 超出maxPacketSize需要分片
+			if (count <= maxPacketSize)
+			{
+				Kcp.KcpSend(this.kcp, memoryStream.GetBuffer(), (int)memoryStream.Position, count);
+			}
+			else
+			{
+				// 先发分片信息
+				this.sendCache.WriteTo(0, 0);
+				this.sendCache.WriteTo(4, count);
+				Kcp.KcpSend(this.kcp, this.sendCache, 0, 8);
+
+				// 分片发送
+				int alreadySendCount = 0;
+				while (alreadySendCount < count)
+				{
+					int leftCount = count - alreadySendCount;
+					
+					int sendCount = leftCount < maxPacketSize? leftCount: maxPacketSize;
+					
+					Kcp.KcpSend(this.kcp, memoryStream.GetBuffer(), (int)memoryStream.Position + alreadySendCount, sendCount);
+					
+					alreadySendCount += sendCount;
+				}
+			}
+
 			this.Service.AddToUpdateNextTime(0, this.Id);
 		}
 		
@@ -394,7 +452,7 @@ namespace ET
 				
 				if (n > maxWaitSize)
 				{
-					Log.Error($"kcp wait snd too large: {n}: {this.Id} {this.RemoteConn}");
+					Log.Error($"kcp wait snd too large: {n}: {this.Id} {this.LocalConn} {this.RemoteConn}");
 					this.OnError(ErrorCode.ERR_KcpWaitSendSizeTooLarge);
 					return;
 				}
@@ -420,7 +478,5 @@ namespace ET
 			this.Service.Remove(channelId);
 			this.Service.OnError(channelId, error);
 		}
-		
-#endregion
 	}
 }

+ 60 - 72
Unity/Assets/Model/Module/Network/KService.cs

@@ -1,4 +1,5 @@
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.IO;
 using System.Linq;
@@ -14,10 +15,6 @@ namespace ET
         public const byte ACK = 2;
         public const byte FIN = 3;
         public const byte MSG = 4;
-        
-        public const byte RouterReconnect = 10;
-        public const byte RouterAck = 11;
-        public const byte RouterSYN = 12;
     }
 
     public enum ServiceType
@@ -29,14 +26,14 @@ namespace ET
     public sealed class KService: AService
     {
         // KService创建的时间
-        public long StartTime;
+        private readonly long startTime;
 
         // 当前时间 - KService创建的时间, 线程安全
         public uint TimeNow
         {
             get
             {
-                return (uint) (TimeHelper.ClientNow() - this.StartTime);
+                return (uint) (TimeHelper.ClientNow() - this.startTime);
             }
         }
 
@@ -44,15 +41,15 @@ namespace ET
 
 
 #region 回调方法
+
         static KService()
         {
             //Kcp.KcpSetLog(KcpLog);
             Kcp.KcpSetoutput(KcpOutput);
         }
-		
+
         private static readonly byte[] logBuffer = new byte[1024];
-        
-		
+
 #if ENABLE_IL2CPP
 		[AOT.MonoPInvokeCallback(typeof(KcpOutput))]
 #endif
@@ -69,7 +66,6 @@ namespace ET
             }
         }
 
-
 #if ENABLE_IL2CPP
 		[AOT.MonoPInvokeCallback(typeof(KcpOutput))]
 #endif
@@ -77,7 +73,16 @@ namespace ET
         {
             try
             {
-                KChannel kChannel = KChannel.kChannels[(uint) user];
+                if (kcp == IntPtr.Zero)
+                {
+                    return 0;
+                }
+
+                if (!KChannel.KcpPtrChannels.TryGetValue(kcp, out KChannel kChannel))
+                {
+                    return 0;
+                }
+                
                 kChannel.Output(bytes, len);
             }
             catch (Exception e)
@@ -85,19 +90,17 @@ namespace ET
                 Log.Error(e);
                 return len;
             }
+
             return len;
         }
+
 #endregion
 
-        
-        
-#region 主线程
-        
         public KService(ThreadSynchronizationContext threadSynchronizationContext, IPEndPoint ipEndPoint, ServiceType serviceType)
         {
             this.ServiceType = serviceType;
             this.ThreadSynchronizationContext = threadSynchronizationContext;
-            this.StartTime = TimeHelper.ClientNow();
+            this.startTime = TimeHelper.ClientNow();
             this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
             if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
             {
@@ -119,7 +122,7 @@ namespace ET
         {
             this.ServiceType = serviceType;
             this.ThreadSynchronizationContext = threadSynchronizationContext;
-            this.StartTime = TimeHelper.ClientNow();
+            this.startTime = TimeHelper.ClientNow();
             this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
             // 作为客户端不需要修改发送跟接收缓冲区大小
             this.socket.Bind(new IPEndPoint(IPAddress.Any, 0));
@@ -132,41 +135,27 @@ namespace ET
                 this.socket.IOControl((int) SIO_UDP_CONNRESET, new[] { Convert.ToByte(false) }, null);
             }
         }
-        
 
         public void ChangeAddress(long id, IPEndPoint address)
         {
-#if NET_THREAD
-            this.ThreadSynchronizationContext.Post(() =>
-                    {
-#endif
-                        KChannel kChannel = this.Get(id);
-                        if (kChannel == null)
-                        {
-                            return;
-                        }
+            KChannel kChannel = this.Get(id);
+            if (kChannel == null)
+            {
+                return;
+            }
 
-                        Log.Info($"channel change address: {id} {address}");
-                        kChannel.RemoteAddress = address;
-#if NET_THREAD
-                    }
-        );
-#endif
+            Log.Info($"channel change address: {id} {address}");
+            kChannel.RemoteAddress = address;
         }
 
-#endregion
 
-#region 网络线程
+        // 保存所有的channel
         private readonly Dictionary<long, KChannel> idChannels = new Dictionary<long, KChannel>();
         private readonly Dictionary<long, KChannel> localConnChannels = new Dictionary<long, KChannel>();
         private readonly Dictionary<long, KChannel> waitConnectChannels = new Dictionary<long, KChannel>();
-        private readonly List<long> waitRemoveChannels = new List<long>();
 
         private readonly byte[] cache = new byte[8192];
         private EndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, 0);
-        
-        // 网络线程
-        private readonly Random random = new Random(Guid.NewGuid().GetHashCode());
 
         // 下帧要更新的channel
         private readonly HashSet<long> updateChannels = new HashSet<long>();
@@ -231,7 +220,7 @@ namespace ET
                     KChannel kChannel = null;
                     switch (flag)
                     {
-#if NOT_CLIENT
+#if NOT_UNITY
                         case KcpProtocalType.SYN: // accept
                         {
                             // 长度!=5,不是SYN消息
@@ -253,7 +242,7 @@ namespace ET
                             this.waitConnectChannels.TryGetValue(remoteConn, out kChannel);
                             if (kChannel == null)
                             {
-                                localConn = CreateRandomLocalConn(this.random);
+                                localConn = CreateRandomLocalConn();
                                 // 已存在同样的localConn,则不处理,等待下次sync
                                 if (this.localConnChannels.ContainsKey(localConn))
                                 {
@@ -275,7 +264,6 @@ namespace ET
                                 IPEndPoint realEndPoint = kChannel.RealAddress == null? kChannel.RemoteAddress : NetworkHelper.ToIPEndPoint(kChannel.RealAddress);
                                 this.OnAccept(kChannel.Id, realEndPoint);
                             }
-
                             if (kChannel.RemoteConn != remoteConn)
                             {
                                 break;
@@ -358,7 +346,6 @@ namespace ET
                             {
                                 break;
                             }
-                            
                             // 处理chanel
                             remoteConn = BitConverter.ToUInt32(this.cache, 1);
                             localConn = BitConverter.ToUInt32(this.cache, 5);
@@ -388,7 +375,7 @@ namespace ET
             }
         }
 
-        private KChannel Get(long id)
+        public KChannel Get(long id)
         {
             KChannel channel;
             this.idChannels.TryGetValue(id, out channel);
@@ -404,17 +391,18 @@ namespace ET
 
         protected override void Get(long id, IPEndPoint address)
         {
-            if (this.idChannels.TryGetValue(id, out KChannel channel))
+            if (this.idChannels.TryGetValue(id, out KChannel kChannel))
             {
                 return;
             }
+
             try
             {
                 // 低32bit是localConn
-                uint localConn = (uint)((ulong) id & uint.MaxValue);
-                channel = new KChannel(id, localConn, this.socket, address, this);
-                this.idChannels.Add(id, channel);
-                this.localConnChannels.Add(channel.LocalConn, channel);
+                uint localConn = (uint) ((ulong) id & uint.MaxValue);
+                kChannel = new KChannel(id, localConn, this.socket, address, this);
+                this.idChannels.Add(id, kChannel);
+                this.localConnChannels.Add(kChannel.LocalConn, kChannel);
             }
             catch (Exception e)
             {
@@ -435,7 +423,7 @@ namespace ET
             {
                 if (waitChannel.LocalConn == kChannel.LocalConn)
                 {
-                    this.waitConnectChannels.Remove(kChannel.RemoteConn);        
+                    this.waitConnectChannels.Remove(kChannel.RemoteConn);
                 }
             }
             kChannel.Dispose();
@@ -522,34 +510,35 @@ namespace ET
 
         private void RemoveConnectTimeoutChannels()
         {
-            this.waitRemoveChannels.Clear();
-            
-            foreach (long channelId in this.waitConnectChannels.Keys)
+            using (ListComponent<long> waitRemoveChannels = ListComponent<long>.Create())
             {
-                this.waitConnectChannels.TryGetValue(channelId, out KChannel kChannel);
-                if (kChannel == null)
-                {
-                    Log.Error($"RemoveConnectTimeoutChannels not found kchannel: {channelId}");
-                    continue;
-                }
-                
-                // 连接上了要马上删除
-                if (kChannel.IsConnected)
+                foreach (long channelId in this.waitConnectChannels.Keys)
                 {
-                    this.waitRemoveChannels.Add(channelId);
+                    this.waitConnectChannels.TryGetValue(channelId, out KChannel kChannel);
+                    if (kChannel == null)
+                    {
+                        Log.Error($"RemoveConnectTimeoutChannels not found kchannel: {channelId}");
+                        continue;
+                    }
+
+                    // 连接上了要马上删除
+                    if (kChannel.IsConnected)
+                    {
+                        waitRemoveChannels.List.Add(channelId);
+                    }
+
+                    // 10秒连接超时
+                    if (this.TimeNow > kChannel.CreateTime + 10 * 1000)
+                    {
+                        waitRemoveChannels.List.Add(channelId);
+                    }
                 }
 
-                // 10秒连接超时
-                if (this.TimeNow > kChannel.CreateTime + 10 * 1000)
+                foreach (long channelId in waitRemoveChannels.List)
                 {
-                    this.waitRemoveChannels.Add(channelId);
+                    this.waitConnectChannels.Remove(channelId);
                 }
             }
-
-            foreach (long channelId in this.waitRemoveChannels)
-            {
-                this.waitConnectChannels.Remove(channelId);
-            }
         }
 
         // 计算到期需要update的channel
@@ -591,6 +580,5 @@ namespace ET
                 this.timeId.Remove(k);
             }
         }
-#endregion
     }
 }

BIN
Unity/Assets/Plugins/MacOS/libkcp.dylib


+ 7 - 7
Unity/Assets/ThirdParty/ShareLib/Kcp/Kcp.cs

@@ -13,7 +13,7 @@ namespace ET
     {
         public const int OneM = 1024 * 1024;
         public const int InnerMaxWaitSize = 1024 * 1024;
-        public const int OuterMaxWaitSize = 1024 * 10;
+        public const int OuterMaxWaitSize = 1024 * 1024;
         
         
         private static KcpOutput KcpOutput;
@@ -24,7 +24,6 @@ namespace ET
 #else
         const string KcpDLL = "kcp";
 #endif
-
         
         [DllImport(KcpDLL, CallingConvention=CallingConvention.Cdecl)]
         private static extern uint ikcp_check(IntPtr kcp, uint current);
@@ -41,7 +40,7 @@ namespace ET
         [DllImport(KcpDLL, CallingConvention=CallingConvention.Cdecl)]
         private static extern int ikcp_peeksize(IntPtr kcp);
         [DllImport(KcpDLL, CallingConvention=CallingConvention.Cdecl)]
-        private static extern int ikcp_recv(IntPtr kcp, byte[] buffer, int len);
+        private static extern int ikcp_recv(IntPtr kcp, byte[] buffer, int index, int len);
         [DllImport(KcpDLL, CallingConvention=CallingConvention.Cdecl)]
         private static extern void ikcp_release(IntPtr kcp);
         [DllImport(KcpDLL, CallingConvention=CallingConvention.Cdecl)]
@@ -129,18 +128,19 @@ namespace ET
             return ret;
         }
 
-        public static int KcpRecv(IntPtr kcp, byte[] buffer, int len)
+        public static int KcpRecv(IntPtr kcp, byte[] buffer, int index, int len)
         {
             if (kcp == IntPtr.Zero)
             {
                 throw new Exception($"kcp error, kcp point is zero");
             }
 
-            if (buffer.Length != len)
+            if (buffer.Length < index + len)
             {
-                throw new Exception($"kcp error, KcpRecv error: {len}");
+                throw new Exception($"kcp error, KcpRecv error: {index} {len}");
             }
-            int ret = ikcp_recv(kcp, buffer, len);
+            
+            int ret = ikcp_recv(kcp, buffer, index, len);
             return ret;
         }