فهرست منبع

1.增加channel自动连接功能,每5秒连接一次
2.发送消息时,假如还未连接,数据放到channel的缓冲里面,连接成功后自动发送

tanghai 10 سال پیش
والد
کامیت
da3dde2abe

+ 3 - 27
CSharp/Game/Model/Component/NetworkComponent.cs

@@ -93,34 +93,10 @@ namespace Model
 			}
 		}
 
-		public async void SendAsync(string address, byte[] buffer)
+		public void SendAsync(string address, byte[] buffer)
 		{
-			Queue<byte[]> queue;
-			AChannel channel;
-			// 连接已存在
-			if (this.service.HasChannel(address))
-			{
-				channel = await this.service.GetChannel(address);
-				channel.SendAsync(buffer);
-				return;
-			}
-
-			// 连接不存在,但是处于正在连接过程中
-			if (this.cache.TryGetValue(address, out queue))
-			{
-				queue.Enqueue(buffer);
-				return;
-			}
-
-			// 连接不存在,需要启动连接
-			queue = new Queue<byte[]>();
-			queue.Enqueue(buffer);
-			this.cache[address] = queue;
-			channel = await this.service.GetChannel(address);
-			while (queue.Count > 0)
-			{
-				channel.SendAsync(queue.Dequeue());
-			}
+			AChannel channel = this.service.GetChannel(address);
+			channel.SendAsync(buffer);
 		}
 
 		// 消息回调或者超时回调

+ 8 - 0
CSharp/Platform/Common/Base/TimerManager.cs

@@ -1,5 +1,6 @@
 using System;
 using System.Collections.Generic;
+using System.Threading.Tasks;
 using Common.Helper;
 using MongoDB.Bson;
 
@@ -42,6 +43,13 @@ namespace Common.Base
 			this.timeGuid.Remove(timer.Time, timer.Id);
 		}
 
+		public Task<bool> Sleep(int time)
+		{
+			TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
+			this.Add(TimeHelper.Now() + time, () => { tcs.SetResult(true); });
+			return tcs.Task;
+		}
+
 		public void Refresh()
 		{
 			long timeNow = TimeHelper.Now();

+ 2 - 0
CSharp/Platform/Common/Network/AChannel.cs

@@ -25,6 +25,8 @@ namespace Common.Network
 			this.service = service;
 		}
 
+		public abstract Task<bool> ConnectAsync();
+
 		/// <summary>
 		/// 发送消息
 		/// </summary>

+ 2 - 4
CSharp/Platform/Common/Network/IService.cs

@@ -21,14 +21,12 @@ namespace Common.Network
 
 		AChannel GetChannel(ObjectId id);
 
-		Task<AChannel> GetChannel(string host, int port);
+		AChannel GetChannel(string host, int port);
 
-		Task<AChannel> GetChannel(string address);
+		AChannel GetChannel(string address);
 
 		Task<AChannel> GetChannel();
 
-		bool HasChannel(string address);
-
 		void Remove(AChannel channel);
 
 		void Update();

+ 39 - 12
CSharp/Platform/TNet/TChannel.cs

@@ -9,7 +9,7 @@ using MongoDB.Bson;
 
 namespace TNet
 {
-	internal class TChannel: AChannel
+	public class TChannel: AChannel
 	{
 		private const int SendInterval = 0;
 		private TSocket socket;
@@ -21,17 +21,27 @@ namespace TNet
 		private Action onParseComplete = () => { };
 		private readonly PacketParser parser;
 		private readonly string remoteAddress;
+		private bool isConnected;
 
 		public TChannel(TSocket socket, TService service): base(service)
 		{
+			this.isConnected = true;
 			this.socket = socket;
 			this.service = service;
 			this.parser = new PacketParser(this.recvBuffer);
 			this.remoteAddress = this.socket.RemoteAddress;
-
 			this.StartRecv();
 		}
 
+		public TChannel(TSocket socket, string host, int port, TService service)
+			: base(service)
+		{
+			this.socket = socket;
+			this.service = service;
+			this.parser = new PacketParser(this.recvBuffer);
+			this.remoteAddress = host + ":" + port;
+		}
+
 		private void Dispose(bool disposing)
 		{
 			if (this.socket == null)
@@ -63,15 +73,34 @@ namespace TNet
 			GC.SuppressFinalize(this);
 		}
 
+		public override async Task<bool> ConnectAsync()
+		{
+			string[] ss = this.RemoteAddress.Split(':');
+			int port = int.Parse(ss[1]);
+			bool result = await this.socket.ConnectAsync(ss[0], port);
+			this.isConnected = true;
+			this.SetStartSendFlag();
+			this.StartRecv();
+			return result;
+		}
+
+		private void SetStartSendFlag()
+		{
+			if (this.sendTimer == ObjectId.Empty)
+			{
+				this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
+			}
+		}
+
 		public override void SendAsync(
 				byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
 		{
 			byte[] size = BitConverter.GetBytes(buffer.Length);
 			this.sendBuffer.SendTo(size);
 			this.sendBuffer.SendTo(buffer);
-			if (this.sendTimer == ObjectId.Empty)
+			if (this.isConnected)
 			{
-				this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
+				this.SetStartSendFlag();
 			}
 		}
 
@@ -85,10 +114,9 @@ namespace TNet
 			{
 				this.sendBuffer.SendTo(buffer);	
 			}
-			
-			if (this.sendTimer == ObjectId.Empty)
+			if (this.isConnected)
 			{
-				this.sendTimer = this.service.Timer.Add(TimeHelper.Now() + SendInterval, this.StartSend);
+				this.SetStartSendFlag();
 			}
 		}
 
@@ -135,7 +163,7 @@ namespace TNet
 			tcs.SetResult(packet);
 		}
 
-		private async void StartSend()
+		public async void StartSend()
 		{
 			try
 			{
@@ -175,10 +203,9 @@ namespace TNet
 			{
 				while (true)
 				{
-					int n =
-							await
-									this.socket.RecvAsync(this.recvBuffer.Last, this.recvBuffer.LastIndex,
-											TBuffer.ChunkSize - this.recvBuffer.LastIndex);
+					int n = await this.socket.RecvAsync(
+						this.recvBuffer.Last, this.recvBuffer.LastIndex,
+								TBuffer.ChunkSize - this.recvBuffer.LastIndex);
 					if (n == 0)
 					{
 						break;

+ 29 - 19
CSharp/Platform/TNet/TService.cs

@@ -3,6 +3,7 @@ using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
 using Common.Base;
+using Common.Logger;
 using Common.Network;
 using MongoDB.Bson;
 
@@ -81,16 +82,6 @@ namespace TNet
 			return channel;
 		}
 
-		private async Task<AChannel> ConnectAsync(string host, int port)
-		{
-			TSocket newSocket = new TSocket(this.poller);
-			await newSocket.ConnectAsync(host, port);
-			TChannel channel = new TChannel(newSocket, this);
-			this.channels[newSocket.RemoteAddress] = channel;
-			this.idChannels[channel.Id] = channel;
-			return channel;
-		}
-
 		public async Task<AChannel> GetChannel()
 		{
 			if (this.acceptor == null)
@@ -105,11 +96,6 @@ namespace TNet
 			return channel;
 		}
 
-		public bool HasChannel(string address)
-		{
-			return this.channels.ContainsKey(address);
-		}
-
 		public void Remove(AChannel channel)
 		{
 			TChannel tChannel = channel as TChannel;
@@ -122,21 +108,45 @@ namespace TNet
 			this.timerManager.Remove(tChannel.SendTimer);
 		}
 
-		public async Task<AChannel> GetChannel(string host, int port)
+		private async void SocketConnectAsync(AChannel channel)
+		{
+			while (true)
+			{
+				try
+				{
+					await channel.ConnectAsync();
+					break;
+				}
+				catch (Exception e)
+				{
+					Log.Trace(e.ToString());
+				}
+				
+				await this.Timer.Sleep(5000);
+			}
+		}
+
+		public AChannel GetChannel(string host, int port)
 		{
 			TChannel channel = null;
 			if (this.channels.TryGetValue(host + ":" + port, out channel))
 			{
 				return channel;
 			}
-			return await this.ConnectAsync(host, port);
+
+			TSocket newSocket = new TSocket(this.poller);
+			channel = new TChannel(newSocket, host, port, this);
+			this.channels[channel.RemoteAddress] = channel;
+			this.idChannels[channel.Id] = channel;
+			this.SocketConnectAsync(channel);
+			return channel;
 		}
 
-		public async Task<AChannel> GetChannel(string address)
+		public AChannel GetChannel(string address)
 		{
 			string[] ss = address.Split(':');
 			int port = int.Parse(ss[1]);
-			return await this.GetChannel(ss[0], port);
+			return this.GetChannel(ss[0], port);
 		}
 
 		public void Update()

+ 48 - 3
CSharp/Platform/UNet/UChannel.cs

@@ -2,24 +2,41 @@
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
-using Common.Helper;
+using Common.Logger;
 using Common.Network;
-using MongoDB.Bson;
 
 namespace UNet
 {
+	internal class BufferInfo
+	{
+		public byte[] Buffer { get; set; }
+		public byte ChannelID { get; set; }
+		public PacketFlags Flags { get; set; }
+	}
+
 	internal class UChannel: AChannel
 	{
 		private USocket socket;
 		private readonly string remoteAddress;
+		private readonly Queue<BufferInfo> queue = new Queue<BufferInfo>();
+		private bool isConnected;
 
 		public UChannel(USocket socket, UService service): base(service)
 		{
+			this.isConnected = true;
 			this.socket = socket;
 			this.service = service;
 			this.remoteAddress = this.socket.RemoteAddress;
 		}
 
+		public UChannel(USocket socket, string host, int port, UService service)
+			: base(service)
+		{
+			this.socket = socket;
+			this.service = service;
+			this.remoteAddress = host + ":" + port;
+		}
+
 		private void Dispose(bool disposing)
 		{
 			if (this.socket == null)
@@ -50,14 +67,34 @@ namespace UNet
 			GC.SuppressFinalize(this);
 		}
 
+		public override async Task<bool> ConnectAsync()
+		{
+			string[] ss = this.RemoteAddress.Split(':');
+			int port = int.Parse(ss[1]);
+			bool result = await this.socket.ConnectAsync(ss[0], (ushort)port);
+			this.isConnected = true;
+			while (this.queue.Count > 0)
+			{
+				BufferInfo info = this.queue.Dequeue();
+				this.SendAsync(info.Buffer, info.ChannelID, info.Flags);
+			}
+			return result;
+		}
+
 		public override void SendAsync(
 				byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
 		{
+			if (!this.isConnected)
+			{
+				BufferInfo info = new BufferInfo { Buffer = buffer, ChannelID = channelID, Flags = flags };
+				this.queue.Enqueue(info);
+				return;
+			}
 			this.socket.SendAsync(buffer, channelID, flags);
 		}
 
 		public override void SendAsync(
-			List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
+				List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
 		{
 			int size = buffers.Select(b => b.Length).Sum();
 			byte[] buffer = new byte[size];
@@ -67,6 +104,14 @@ namespace UNet
 				Array.Copy(bytes, 0, buffer, index, bytes.Length);
 				index += bytes.Length;
 			}
+
+			if (!this.isConnected)
+			{
+				BufferInfo info = new BufferInfo { Buffer = buffer, ChannelID = channelID, Flags = flags };
+				this.queue.Enqueue(info);
+				return;
+			}
+
 			this.socket.SendAsync(buffer, channelID, flags);
 		}
 

+ 20 - 28
CSharp/Platform/UNet/UPoller.cs

@@ -20,7 +20,7 @@ namespace UNet
 
 		private IntPtr host;
 
-		private readonly USocket acceptor = new USocket(IntPtr.Zero);
+		private readonly USocket acceptor;
 
 		// 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
 		private readonly ConcurrentQueue<Action> concurrentQueue = new ConcurrentQueue<Action>();
@@ -31,6 +31,7 @@ namespace UNet
 
 		public UPoller(string hostName, ushort port)
 		{
+			this.acceptor = new USocket(IntPtr.Zero, this);
 			UAddress address = new UAddress(hostName, port);
 			ENetAddress nativeAddress = address.Struct;
 			this.host = NativeMethods.ENetHostCreate(ref nativeAddress,
@@ -78,6 +79,22 @@ namespace UNet
 			this.host = IntPtr.Zero;
 		}
 
+		public USocketManager USocketManager
+		{
+			get
+			{
+				return this.uSocketManager;
+			}
+		}
+
+		public IntPtr Host
+		{
+			get
+			{
+				return this.host;
+			}
+		}
+
 		public Task<USocket> AcceptAsync()
 		{
 			if (this.uSocketManager.ContainsKey(IntPtr.Zero))
@@ -93,7 +110,7 @@ namespace UNet
 				IntPtr ptr = this.connQueue.FirstKey;
 				this.connQueue.Remove(ptr);
 
-				USocket socket = new USocket(ptr);
+				USocket socket = new USocket(ptr, this);
 				this.uSocketManager.Add(ptr, socket);
 				tcs.TrySetResult(socket);
 			}
@@ -108,7 +125,7 @@ namespace UNet
 					}
 
 					this.uSocketManager.Remove(IntPtr.Zero);
-					USocket socket = new USocket(eEvent.Peer);
+					USocket socket = new USocket(eEvent.Peer, this);
 					this.uSocketManager.Add(socket.PeerPtr, socket);
 					tcs.TrySetResult(socket);
 				};
@@ -116,31 +133,6 @@ namespace UNet
 			return tcs.Task;
 		}
 
-		public Task<USocket> ConnectAsync(string hostName, ushort port)
-		{
-			var tcs = new TaskCompletionSource<USocket>();
-			UAddress address = new UAddress(hostName, port);
-			ENetAddress nativeAddress = address.Struct;
-
-			IntPtr ptr = NativeMethods.ENetHostConnect(this.host, ref nativeAddress,
-					NativeMethods.ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT, 0);
-			USocket socket = new USocket(ptr);
-			if (socket.PeerPtr == IntPtr.Zero)
-			{
-				throw new UException("host connect call failed.");
-			}
-			this.uSocketManager.Add(socket.PeerPtr, socket);
-			socket.Connected = eEvent =>
-			{
-				if (eEvent.Type == EventType.Disconnect)
-				{
-					tcs.TrySetException(new UException("socket disconnected in connect"));
-				}
-				tcs.TrySetResult(socket);
-			};
-			return tcs.Task;
-		}
-
 		private ENetEvent GetEvent()
 		{
 			if (this.eNetEventCache == null)

+ 26 - 15
CSharp/Platform/UNet/UService.cs

@@ -3,6 +3,7 @@ using System.Collections.Generic;
 using System.Linq;
 using System.Threading.Tasks;
 using Common.Base;
+using Common.Logger;
 using Common.Network;
 using MongoDB.Bson;
 
@@ -72,30 +73,45 @@ namespace UNet
 			this.poller.Add(action);
 		}
 
-		private async Task<AChannel> ConnectAsync(string host, int port)
+		private async void SocketConnectAsync(AChannel channel)
 		{
-			USocket newSocket = await this.poller.ConnectAsync(host, (ushort) port);
-			UChannel channel = new UChannel(newSocket, this);
-			this.channels[channel.RemoteAddress] = channel;
-			this.idChannels[channel.Id] = channel;
-			return channel;
+			while (true)
+			{
+				try
+				{
+					await channel.ConnectAsync();
+					break;
+				}
+				catch (Exception e)
+				{
+					Log.Trace(e.ToString());
+				}
+
+				await this.Timer.Sleep(5000);
+			}
 		}
 
-		public async Task<AChannel> GetChannel(string host, int port)
+		public AChannel GetChannel(string host, int port)
 		{
 			UChannel channel = null;
 			if (this.channels.TryGetValue(host + ":" + port, out channel))
 			{
 				return channel;
 			}
-			return await this.ConnectAsync(host, port);
+
+			USocket newSocket = new USocket(this.poller);
+			channel = new UChannel(newSocket, host, port, this);
+			this.channels[channel.RemoteAddress] = channel;
+			this.idChannels[channel.Id] = channel;
+			this.SocketConnectAsync(channel);
+			return channel;
 		}
 
-		public async Task<AChannel> GetChannel(string address)
+		public AChannel GetChannel(string address)
 		{
 			string[] ss = address.Split(':');
 			int port = int.Parse(ss[1]);
-			return await this.GetChannel(ss[0], port);
+			return this.GetChannel(ss[0], port);
 		}
 
 		public async Task<AChannel> GetChannel()
@@ -114,11 +130,6 @@ namespace UNet
 			return channel;
 		}
 
-		public bool HasChannel(string address)
-		{
-			return this.channels.ContainsKey(address);
-		}
-
 		public void Remove(AChannel channel)
 		{
 			UChannel tChannel = channel as UChannel;

+ 34 - 1
CSharp/Platform/UNet/USocket.cs

@@ -2,12 +2,14 @@
 using System.Collections.Generic;
 using System.Runtime.InteropServices;
 using System.Threading.Tasks;
+using Common.Logger;
 using Common.Network;
 
 namespace UNet
 {
 	internal sealed class USocket: IDisposable
 	{
+		private readonly UPoller poller;
 		private IntPtr peerPtr;
 		private readonly Queue<byte[]> recvQueue = new Queue<byte[]>();
 
@@ -26,11 +28,17 @@ namespace UNet
 			this.peerPtr = IntPtr.Zero;
 		}
 
-		public USocket(IntPtr peerPtr)
+		public USocket(IntPtr peerPtr, UPoller poller)
 		{
+			this.poller = poller;
 			this.peerPtr = peerPtr;
 		}
 
+		public USocket(UPoller poller)
+		{
+			this.poller = poller;
+		}
+
 		~USocket()
 		{
 			this.Dispose(false);
@@ -86,6 +94,31 @@ namespace UNet
 			NativeMethods.ENetPeerThrottleConfigure(this.peerPtr, interval, acceleration, deceleration);
 		}
 
+		public Task<bool> ConnectAsync(string hostName, ushort port)
+		{
+			var tcs = new TaskCompletionSource<bool>();
+			UAddress address = new UAddress(hostName, port);
+			ENetAddress nativeAddress = address.Struct;
+
+			this.peerPtr = NativeMethods.ENetHostConnect(this.poller.Host, ref nativeAddress,
+					NativeMethods.ENET_PROTOCOL_MAXIMUM_CHANNEL_COUNT, 0);
+			if (this.PeerPtr == IntPtr.Zero)
+			{
+				throw new UException("host connect call failed.");
+			}
+			this.poller.USocketManager.Add(this.PeerPtr, this);
+			this.Connected = eEvent =>
+			{
+				if (eEvent.Type == EventType.Disconnect)
+				{
+					tcs.TrySetException(new UException("socket disconnected in connect"));
+				}
+				Log.Debug("11111111111111, connect ok");
+				tcs.TrySetResult(true);
+			};
+			return tcs.Task;
+		}
+
 		public void SendAsync(byte[] data, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
 		{
 			UPacket packet = new UPacket(data, flags);

+ 1 - 3
CSharp/Test/TNetTest/TServiceTest.cs

@@ -19,7 +19,7 @@ namespace TNetTest
 
 		private async void ClientEvent(IService service, string hostName, ushort port)
 		{
-			AChannel channel = await service.GetChannel(hostName, port);
+			AChannel channel = service.GetChannel(hostName, port);
 			for (int i = 0; i < echoTimes; ++i)
 			{
 				channel.SendAsync("0123456789".ToByteArray());
@@ -72,8 +72,6 @@ namespace TNetTest
 					// 往server host线程增加事件,accept
 					serverService.Add(() => this.ServerEvent(serverService));
 
-					Thread.Sleep(1000);
-
 					// 往client host线程增加事件,client线程连接server
 					clientService.Add(() => this.ClientEvent(clientService, hostName, port));
 

+ 1 - 1
CSharp/Test/UNetTest/UServiceTest.cs

@@ -19,7 +19,7 @@ namespace UNetTest
 
 		private async void ClientEvent(IService clientService, string hostName, ushort port)
 		{
-			AChannel channel = await clientService.GetChannel(hostName, port);
+			AChannel channel = clientService.GetChannel(hostName, port);
 			for (int i = 0; i < echoTimes; ++i)
 			{
 				channel.SendAsync("0123456789".ToByteArray());