Просмотр исходного кода

增加TServer和TSession两个类,TSession提供send和recv应用层缓冲区,应用层会先将数据读写到缓冲区.通过时间回调来触发真正的系统读写事件.如果应用层有大量小包,这样合并发送性能会非常高.

tanghai 11 лет назад
Родитель
Сommit
d326e8102f

+ 85 - 0
CSharp/Platform/Common/Base/TimerManager.cs

@@ -0,0 +1,85 @@
+using System;
+using System.Collections.Generic;
+using Common.Helper;
+using MongoDB.Bson;
+
+namespace Common.Base
+{
+	public class TimerManager
+	{
+		private class Timer
+		{
+			public ObjectId Id { get; set; }
+			public long Time { get; set; }
+			public Action Action { get; set; }
+		}
+
+		private readonly Dictionary<ObjectId, Timer> timers = new Dictionary<ObjectId, Timer>();
+
+		/// <summary>
+		/// key: time, value: timer id
+		/// </summary>
+		private readonly MultiMap<long, ObjectId> timeGuid = new MultiMap<long, ObjectId>();
+
+		public ObjectId Add(long time, Action action)
+		{
+			Timer timer = new Timer { Id = ObjectId.GenerateNewId(), Time = time, Action = action };
+			this.timers[timer.Id] = timer;
+			this.timeGuid.Add(timer.Time, timer.Id);
+			return timer.Id;
+		}
+
+		public void Update(ObjectId id, long time)
+		{
+			Timer timer;
+			if (!this.timers.TryGetValue(id, out timer))
+			{
+				return;
+			}
+
+			this.timeGuid.Remove(timer.Time, timer.Id);
+			timer.Time = time;
+			this.timeGuid.Add(timer.Time, timer.Id);
+		}
+
+		public void Remove(ObjectId id)
+		{
+			Timer timer;
+			if (!this.timers.TryGetValue(id, out timer))
+			{
+				return;
+			}
+			this.timers.Remove(timer.Id);
+			this.timeGuid.Remove(timer.Time, timer.Id);
+		}
+
+		public void Refresh()
+		{
+			long timeNow = TimeHelper.Now();
+			var timeoutTimer = new List<long>();
+			foreach (long time in this.timeGuid.Keys)
+			{
+				if (time > timeNow)
+				{
+					break;
+				}
+				timeoutTimer.Add(time);
+			}
+
+			foreach (long key in timeoutTimer)
+			{
+				ObjectId[] timeoutIds = this.timeGuid.GetAll(key);
+				foreach (ObjectId id in timeoutIds)
+				{
+					Timer timer;
+					if (!this.timers.TryGetValue(id, out timer))
+					{
+						continue;
+					}
+					this.Remove(id);
+					timer.Action();
+				}
+			}
+		}
+	}
+}

+ 1 - 0
CSharp/Platform/Common/Common.csproj

@@ -58,6 +58,7 @@
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="Base\TimerManager.cs" />
     <Compile Include="Base\Entity.cs" />
     <Compile Include="Base\Component.cs" />
     <Compile Include="Base\MultiMap.cs" />

+ 2 - 3
CSharp/Platform/TNet/IPoller.cs

@@ -1,4 +1,5 @@
 using System;
+using MongoDB.Bson;
 
 namespace TNet
 {
@@ -6,8 +7,6 @@ namespace TNet
 	{
 		void Add(Action action);
 
-		void RunOnce(int timeout);
-
-		void Run();
+		void Run(int timeout);
 	}
 }

+ 22 - 14
CSharp/Platform/TNet/TBuffer.cs

@@ -39,6 +39,10 @@ namespace TNet
 		{
 			get
 			{
+				if (this.bufferList.First == null)
+				{
+					this.AddLast();
+				}
 				return this.bufferList.First.Value;
 			}
 		}
@@ -47,26 +51,29 @@ namespace TNet
 		{
 			get
 			{
+				if (this.bufferList.Last == null)
+				{
+					this.AddLast();
+				}
 				return this.bufferList.Last.Value;
 			}
 		}
 
 		public void RecvFrom(byte[] buffer)
 		{
-			int n = buffer.Length;
-			if (this.Count < n || n <= 0)
+			if (this.Count < buffer.Length || buffer.Length == 0)
 			{
-				throw new Exception(string.Format("bufferList size < n, bufferList: {0} n: {1}", this.Count, n));
+				throw new Exception(string.Format("bufferList size < n, bufferList: {0} buffer length: {1}", this.Count, buffer.Length));
 			}
 			int alreadyCopyCount = 0;
-			while (alreadyCopyCount < n)
+			while (alreadyCopyCount < buffer.Length)
 			{
-				if (ChunkSize - this.FirstIndex > n - alreadyCopyCount)
+				int n = buffer.Length - alreadyCopyCount;
+				if (ChunkSize - this.FirstIndex > n)
 				{ 
-					Array.Copy(this.bufferList.First.Value, this.FirstIndex, buffer, alreadyCopyCount,
-							n - alreadyCopyCount);
-					this.FirstIndex += n - alreadyCopyCount;
-					alreadyCopyCount = n;
+					Array.Copy(this.bufferList.First.Value, this.FirstIndex, buffer, alreadyCopyCount, n);
+					this.FirstIndex += n;
+					alreadyCopyCount += n;
 				}
 				else
 				{
@@ -88,17 +95,18 @@ namespace TNet
 				{
 					this.bufferList.AddLast(new byte[ChunkSize]);
 				}
-				if (ChunkSize - this.LastIndex > alreadyCopyCount)
+				int n = buffer.Length - alreadyCopyCount;
+				if (ChunkSize - this.LastIndex > n)
 				{
-					Array.Copy(buffer, alreadyCopyCount, this.bufferList.Last.Value, this.LastIndex, alreadyCopyCount);
-					this.LastIndex += alreadyCopyCount;
-					alreadyCopyCount = 0;
+					Array.Copy(buffer, alreadyCopyCount, this.bufferList.Last.Value, this.LastIndex, n);
+					this.LastIndex += buffer.Length - alreadyCopyCount;
+					alreadyCopyCount += n;
 				}
 				else
 				{
 					Array.Copy(buffer, alreadyCopyCount, this.bufferList.Last.Value, this.LastIndex,
 							ChunkSize - this.LastIndex);
-					alreadyCopyCount -= ChunkSize - this.LastIndex;
+					alreadyCopyCount += ChunkSize - this.LastIndex;
 					this.LastIndex = 0;
 				}
 			}

+ 19 - 0
CSharp/Platform/TNet/TNet.csproj

@@ -33,6 +33,14 @@
     <WarningLevel>4</WarningLevel>
   </PropertyGroup>
   <ItemGroup>
+    <Reference Include="MongoDB.Bson, Version=1.10.0.277, Culture=neutral, PublicKeyToken=f686731cfb9cc103, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\packages\mongocsharpdriver.1.10.0-rc0\lib\net35\MongoDB.Bson.dll</HintPath>
+    </Reference>
+    <Reference Include="MongoDB.Driver, Version=1.10.0.277, Culture=neutral, PublicKeyToken=f686731cfb9cc103, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\packages\mongocsharpdriver.1.10.0-rc0\lib\net35\MongoDB.Driver.dll</HintPath>
+    </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core" />
     <Reference Include="System.Xml.Linq" />
@@ -46,8 +54,19 @@
     <Compile Include="TPoller.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="TBuffer.cs" />
+    <Compile Include="TServer.cs" />
+    <Compile Include="TSession.cs" />
     <Compile Include="TSocket.cs" />
   </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\Common\Common.csproj">
+      <Project>{19f8f043-1f99-4550-99df-dea5c7d77e55}</Project>
+      <Name>Common</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+  </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
   <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">

+ 17 - 24
CSharp/Platform/TNet/TPoller.cs

@@ -1,6 +1,8 @@
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using Common.Base;
+using MongoDB.Bson;
 
 namespace TNet
 {
@@ -18,39 +20,30 @@ namespace TNet
 		{
 		}
 
-		public void RunOnce(int timeout)
+		public void Run(int timeout)
 		{
 			// 处理读写线程的回调
 			Action action;
-			if (!this.blockingCollection.TryTake(out action, timeout))
+			if (this.blockingCollection.TryTake(out action, timeout))
 			{
-				return;
-			}
 
-			var queue = new Queue<Action>();
-			queue.Enqueue(action);
+				var queue = new Queue<Action>();
+				queue.Enqueue(action);
 
-			while (true)
-			{
-				if (!this.blockingCollection.TryTake(out action, 0))
+				while (true)
 				{
-					break;
+					if (!this.blockingCollection.TryTake(out action, 0))
+					{
+						break;
+					}
+					queue.Enqueue(action);
 				}
-				queue.Enqueue(action);
-			}
 
-			while (queue.Count > 0)
-			{
-				Action a = queue.Dequeue();
-				a();
-			}
-		}
-
-		public void Run()
-		{
-			while (true)
-			{
-				this.RunOnce(1);
+				while (queue.Count > 0)
+				{
+					Action a = queue.Dequeue();
+					a();
+				}
 			}
 		}
 	}

+ 113 - 0
CSharp/Platform/TNet/TServer.cs

@@ -0,0 +1,113 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Common.Base;
+using MongoDB.Bson;
+
+namespace TNet
+{
+	public class TServer: IDisposable
+	{
+		private IPoller poller = new TPoller();
+		private TSocket acceptor;
+
+		private readonly Dictionary<string, TSession> sessions = new Dictionary<string, TSession>();
+
+		private readonly TimerManager timerManager = new TimerManager();
+		
+		public TServer(int port)
+		{
+			this.acceptor = new TSocket(poller);
+			this.acceptor.Bind("127.0.0.1", port);
+			this.acceptor.Listen(100);
+		}
+
+		public void Dispose()
+		{
+			if (this.poller == null)
+			{
+				return;
+			}
+			
+			this.acceptor.Dispose();
+			this.acceptor = null;
+			this.poller = null;
+		}
+
+		public void Remove(string address)
+		{
+			TSession session;
+			if (!this.sessions.TryGetValue(address, out session))
+			{
+				return;
+			}
+			if (session.SendTimer != ObjectId.Empty)
+			{
+				this.Timer.Remove(session.SendTimer);
+			}
+
+			if (session.RecvTimer != ObjectId.Empty)
+			{
+				this.Timer.Remove(session.RecvTimer);
+			}
+			this.sessions.Remove(address);
+		}
+
+		public void Push(Action action)
+		{
+			this.poller.Add(action);
+		}
+
+		public async Task<TSession> AcceptAsync()
+		{
+			TSocket newSocket = new TSocket(poller);
+			await this.acceptor.AcceptAsync(newSocket);
+			TSession session = new TSession(newSocket, this);
+			sessions[newSocket.RemoteAddress] = session;
+			return session;
+		}
+
+		public async Task<TSession> ConnectAsync(string host, int port)
+		{
+			TSocket newSocket = new TSocket(poller);
+			await newSocket.ConnectAsync(host, port);
+			TSession session = new TSession(newSocket, this);
+			sessions[newSocket.RemoteAddress] = session;
+			return session;
+		}
+
+		public TSession Get(string host, int port)
+		{
+			TSession session = null;
+			this.sessions.TryGetValue(host + ":" + port, out session);
+			return session;
+		}
+
+		public async Task<TSession> GetOrCreate(string host, int port)
+		{
+			TSession session = null;
+			if (this.sessions.TryGetValue(host + ":" + port, out session))
+			{
+				return session;
+			}
+			return await ConnectAsync(host, port);
+		}
+
+		public void Start()
+		{
+			while (true)
+			{
+				poller.Run(1);
+				this.timerManager.Refresh();
+			}
+		}
+
+		public TimerManager Timer
+		{
+			get
+			{
+				return this.timerManager;
+			}
+		}
+	}
+}

+ 154 - 0
CSharp/Platform/TNet/TSession.cs

@@ -0,0 +1,154 @@
+using System;
+using Common.Helper;
+using Common.Logger;
+using MongoDB.Bson;
+
+namespace TNet
+{
+	public class TSession: IDisposable
+	{
+		private const int RecvSendInterval = 100;
+		private readonly TServer server;
+		private TSocket socket;
+		private readonly TBuffer recvBuffer = new TBuffer();
+		private readonly TBuffer sendBuffer = new TBuffer();
+		public ObjectId SendTimer = ObjectId.Empty;
+		public ObjectId RecvTimer = ObjectId.Empty;
+		private event Action onRecv = () => { };
+		private event Action onSend = () => { };
+
+		public event Action OnRecv
+		{
+			add
+			{
+				this.onRecv += value;
+			}
+			remove
+			{
+				this.onRecv -= value;
+			} 
+		}
+		public event Action OnSend
+		{
+			add
+			{
+				this.onSend += value;
+			}
+			remove
+			{
+				this.onSend -= value;
+			}
+		}
+
+		public TSession(TSocket socket, TServer server)
+		{
+			this.socket = socket;
+			this.server = server;
+		}
+
+		public void Dispose()
+		{
+			if (this.socket == null)
+			{
+				return;
+			}
+			this.server.Remove(socket.RemoteAddress);
+			this.socket.Dispose();
+			this.socket = null;
+		}
+
+		public void Send(byte[] buffer)
+		{
+			this.sendBuffer.SendTo(buffer);
+			if (this.SendTimer == ObjectId.Empty)
+			{
+				this.SendTimer = this.server.Timer.Add(TimeHelper.Now() + RecvSendInterval, this.SendTimerCallback);
+			}
+		}
+
+		private async void SendTimerCallback()
+		{
+			try
+			{
+				while (true)
+				{
+					if (this.sendBuffer.Count == 0)
+					{
+						break;
+					}
+					int sendSize = TBuffer.ChunkSize - this.sendBuffer.FirstIndex;
+					if (sendSize > this.sendBuffer.Count)
+					{
+						sendSize = this.sendBuffer.Count;
+					}
+					int n = await this.socket.SendAsync(
+						this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
+					this.sendBuffer.FirstIndex += n;
+					if (this.sendBuffer.FirstIndex == TBuffer.ChunkSize)
+					{
+						this.sendBuffer.FirstIndex = 0;
+						this.sendBuffer.RemoveFirst();
+					}
+				}
+			}
+			catch (Exception e)
+			{
+				Log.Trace(e.ToString());
+			}
+
+			this.onSend();
+			this.SendTimer = ObjectId.Empty;
+		}
+
+		public int RecvSize
+		{
+			get
+			{
+				return this.recvBuffer.Count;
+			}
+		}
+
+		public void Recv(byte[] buffer)
+		{
+			this.recvBuffer.RecvFrom(buffer);
+		}
+
+		public async void Start()
+		{
+			try
+			{
+				while (true)
+				{
+					int n = await this.socket.RecvAsync(
+						this.recvBuffer.Last, this.recvBuffer.LastIndex, TBuffer.ChunkSize - this.recvBuffer.LastIndex);
+					if (n == 0)
+					{
+						break;
+					}
+
+					this.recvBuffer.LastIndex += n;
+					if (this.recvBuffer.LastIndex == TBuffer.ChunkSize)
+					{
+						this.recvBuffer.AddLast();
+						this.recvBuffer.LastIndex = 0;
+					}
+
+					if (this.RecvTimer == ObjectId.Empty)
+					{
+						this.RecvTimer = this.server.Timer.Add(TimeHelper.Now() + RecvSendInterval, this.RecvTimerCallback);
+					}
+				}
+			}
+			catch (Exception e)
+			{
+				Log.Trace(e.ToString());
+			}
+		}
+
+		private void RecvTimerCallback()
+		{
+			this.onRecv();
+			this.RecvTimer = ObjectId.Empty;
+		}
+	}
+}

+ 69 - 26
CSharp/Platform/TNet/TSocket.cs

@@ -9,13 +9,31 @@ namespace TNet
 	{
 		private IPoller poller;
 		private readonly Socket socket;
-		private readonly SocketAsyncEventArgs socketAsyncEventArgs = new SocketAsyncEventArgs();
+		private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
+		private readonly SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
 
 		public TSocket(IPoller poller)
 		{
 			this.poller = poller;
 			this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
-			this.socketAsyncEventArgs.Completed += this.OnComplete;
+			this.innArgs.Completed += this.OnComplete;
+			this.outArgs.Completed += this.OnComplete;
+		}
+
+		public IPoller Poller
+		{
+			get
+			{
+				return this.poller;
+			}
+		}
+
+		public string RemoteAddress
+		{
+			get
+			{
+				return ((IPEndPoint)socket.RemoteEndPoint).Address + ":" + ((IPEndPoint)socket.RemoteEndPoint).Port;
+			}
 		}
 
 		public Socket Socket
@@ -53,7 +71,6 @@ namespace TNet
 			{
 				case SocketAsyncOperation.Accept:
 					action = () => OnAcceptComplete(e);
-					e.AcceptSocket = null;
 					break;
 				case SocketAsyncOperation.Connect:
 					action = () => OnConnectComplete(e);
@@ -77,11 +94,11 @@ namespace TNet
 		public Task<bool> ConnectAsync(string host, int port)
 		{
 			var tcs = new TaskCompletionSource<bool>();
-			this.socketAsyncEventArgs.UserToken = tcs;
-			this.socketAsyncEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
-			if (!this.socket.ConnectAsync(this.socketAsyncEventArgs))
+			this.outArgs.UserToken = tcs;
+			this.outArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
+			if (!this.socket.ConnectAsync(this.outArgs))
 			{
-				this.poller.Add(() => { OnConnectComplete(this.socketAsyncEventArgs); });
+				OnConnectComplete(this.outArgs);
 			}
 			return tcs.Task;
 		}
@@ -89,18 +106,23 @@ namespace TNet
 		private static void OnConnectComplete(SocketAsyncEventArgs e)
 		{
 			var tcs = (TaskCompletionSource<bool>)e.UserToken;
+			e.UserToken = null;
+			if (e.SocketError != SocketError.Success)
+			{
+				tcs.SetException(new Exception(string.Format("socket error: {0}", e.SocketError)));
+				return;
+			}
 			tcs.SetResult(true);
 		}
 
 		public Task<bool> AcceptAsync(TSocket accpetSocket)
 		{
 			var tcs = new TaskCompletionSource<bool>();
-			this.socketAsyncEventArgs.UserToken = tcs;
-			this.socketAsyncEventArgs.AcceptSocket = accpetSocket.socket;
-			if (!this.socket.AcceptAsync(this.socketAsyncEventArgs))
+			this.innArgs.UserToken = tcs;
+			this.innArgs.AcceptSocket = accpetSocket.socket;
+			if (!this.socket.AcceptAsync(this.innArgs))
 			{
-				Action action = () => OnAcceptComplete(this.socketAsyncEventArgs);
-				this.poller.Add(action);
+				OnAcceptComplete(this.innArgs);
 			}
 			return tcs.Task;
 		}
@@ -108,18 +130,23 @@ namespace TNet
 		private static void OnAcceptComplete(SocketAsyncEventArgs e)
 		{
 			var tcs = (TaskCompletionSource<bool>)e.UserToken;
+			e.UserToken = null;
+			if (e.SocketError != SocketError.Success)
+			{
+				tcs.SetException(new Exception(string.Format("socket error: {0}", e.SocketError)));
+				return;
+			}
 			tcs.SetResult(true);
 		}
 
 		public Task<int> RecvAsync(byte[] buffer, int offset, int count)
 		{
 			var tcs = new TaskCompletionSource<int>();
-			this.socketAsyncEventArgs.UserToken = tcs;
-			this.socketAsyncEventArgs.SetBuffer(buffer, offset, count);
-			if (!this.socket.ReceiveAsync(this.socketAsyncEventArgs))
+			this.innArgs.UserToken = tcs;
+			this.innArgs.SetBuffer(buffer, offset, count);
+			if (!this.socket.ReceiveAsync(this.innArgs))
 			{
-				Action action = () => OnRecvComplete(this.socketAsyncEventArgs);
-				this.poller.Add(action);
+				OnRecvComplete(this.innArgs);
 			}
 			return tcs.Task;
 		}
@@ -127,18 +154,23 @@ namespace TNet
 		private static void OnRecvComplete(SocketAsyncEventArgs e)
 		{
 			var tcs = (TaskCompletionSource<int>)e.UserToken;
+			e.UserToken = null;
+			if (e.SocketError != SocketError.Success)
+			{
+				tcs.SetException(new Exception(string.Format("socket error: {0}", e.SocketError)));
+				return;
+			}
 			tcs.SetResult(e.BytesTransferred);
 		}
 
 		public Task<int> SendAsync(byte[] buffer, int offset, int count)
 		{
 			var tcs = new TaskCompletionSource<int>();
-			this.socketAsyncEventArgs.UserToken = tcs;
-			this.socketAsyncEventArgs.SetBuffer(buffer, offset, count);
-			if (!this.socket.SendAsync(this.socketAsyncEventArgs))
+			this.outArgs.UserToken = tcs;
+			this.outArgs.SetBuffer(buffer, offset, count);
+			if (!this.socket.SendAsync(this.outArgs))
 			{
-				Action action = () => OnSendComplete(this.socketAsyncEventArgs);
-				this.poller.Add(action);
+				OnSendComplete(this.outArgs);
 			}
 			return tcs.Task;
 		}
@@ -146,17 +178,22 @@ namespace TNet
 		private static void OnSendComplete(SocketAsyncEventArgs e)
 		{
 			var tcs = (TaskCompletionSource<int>)e.UserToken;
+			e.UserToken = null;
+			if (e.SocketError != SocketError.Success)
+			{
+				tcs.SetException(new Exception(string.Format("socket error: {0}", e.SocketError)));
+				return;
+			}
 			tcs.SetResult(e.BytesTransferred);
 		}
 
 		public Task<bool> DisconnectAsync()
 		{
 			var tcs = new TaskCompletionSource<bool>();
-			this.socketAsyncEventArgs.UserToken = tcs;
-			if (!this.socket.DisconnectAsync(this.socketAsyncEventArgs))
+			this.outArgs.UserToken = tcs;
+			if (!this.socket.DisconnectAsync(this.outArgs))
 			{
-				Action action = () => OnDisconnectComplete(this.socketAsyncEventArgs);
-				this.poller.Add(action);
+				OnDisconnectComplete(this.outArgs);
 			}
 			return tcs.Task;
 		}
@@ -164,6 +201,12 @@ namespace TNet
 		private static void OnDisconnectComplete(SocketAsyncEventArgs e)
 		{
 			var tcs = (TaskCompletionSource<bool>)e.UserToken;
+			e.UserToken = null;
+			if (e.SocketError != SocketError.Success)
+			{
+				tcs.SetException(new Exception(string.Format("socket error: {0}", e.SocketError)));
+				return;
+			}
 			tcs.SetResult(true);
 		}
 	}

+ 4 - 0
CSharp/Platform/TNet/packages.config

@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="utf-8"?>
+<packages>
+  <package id="mongocsharpdriver" version="1.10.0-rc0" targetFramework="net451" />
+</packages>

+ 1 - 0
CSharp/Platform/TNetTest/TNetTest.csproj

@@ -52,6 +52,7 @@
   </Choose>
   <ItemGroup>
     <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="TServerTest.cs" />
     <Compile Include="TSocketTest.cs" />
     <Compile Include="TcpListenerTest.cs" />
   </ItemGroup>

+ 89 - 0
CSharp/Platform/TNetTest/TServerTest.cs

@@ -0,0 +1,89 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Common.Helper;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using TNet;
+
+namespace TNetTest
+{
+	[TestClass]
+	public class TServerTest
+	{
+		private Barrier barrier;
+		private const int clientNum = 100;
+
+		private const int sendNum = 10000;
+
+		[TestMethod]
+		public void SendRecv()
+		{
+			barrier = new Barrier(clientNum + 1);
+			TServer server = new TServer(10000);
+			Task.Factory.StartNew(() => server.Start(), TaskCreationOptions.LongRunning);
+
+			server.Push(() => Server(server));
+			Thread.Sleep(1000);
+
+			for (int i = 0; i < clientNum; ++i)
+			{
+				server.Push(() => this.ClientRequest(server));
+			}
+
+			this.barrier.SignalAndWait();
+		}
+
+		private async void Server(TServer server)
+		{
+			for (int i = 0; i < clientNum; i++)
+			{
+				TSession session = await server.AcceptAsync();
+				int count = 0;
+				session.OnRecv += () => this.ServerResponse(session, ref count);
+				session.Start();
+			}
+		}
+
+		private void ServerResponse(TSession session, ref int count)
+		{
+			byte[] buffer = new byte[10];
+			while (session.RecvSize >= 10)
+			{
+				buffer = new byte[10];
+				session.Recv(buffer);
+				Assert.AreEqual("0123456789", buffer.ToStr());
+				++count;
+			}
+
+			if (count == sendNum)
+			{
+				buffer = "9876543210".ToByteArray();
+				session.Send(buffer);
+			}
+		}
+
+		private async void ClientRequest(TServer server)
+		{
+			TSession session = await server.ConnectAsync("127.0.0.1", 10000);
+			session.OnRecv += () => ClientOnResponse(session);
+			session.Start();
+
+			byte[] buffer = "0123456789".ToByteArray();
+			for (int i = 0; i < sendNum; i++)
+			{
+				session.Send(buffer);
+			}
+		}
+
+		private void ClientOnResponse(TSession session)
+		{
+			if (session.RecvSize < 10)
+			{
+				return;
+			}
+			byte[] buffer = new byte[10];
+			session.Recv(buffer);
+			Assert.AreEqual("9876543210", buffer.ToStr());
+			this.barrier.RemoveParticipant();
+		}
+	}
+}

+ 9 - 1
CSharp/Platform/TNetTest/TSocketTest.cs

@@ -19,7 +19,15 @@ namespace TNetTest
 		{
 			barrier = new Barrier(clientNum + 2);
 			IPoller poller = new TPoller();
-			Task.Factory.StartNew(() => poller.Run(), TaskCreationOptions.LongRunning);
+			Task.Factory.StartNew(() =>
+			{
+				while (true)
+				{
+					poller.Run(1);
+				}
+			}, 
+			
+			TaskCreationOptions.LongRunning);
 
 			poller.Add(() => Server(poller));
 			Thread.Sleep(500);