Przeglądaj źródła

初步封装netmq SendAsync RecvAsync,单元测试OK

tanghai 12 lat temu
rodzic
commit
7301e54ffc

+ 1 - 0
CSharp/Platform/Zmq/Zmq.csproj

@@ -44,6 +44,7 @@
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="ZmqPoller.cs" />
     <Compile Include="ZmqSocket.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>

+ 57 - 0
CSharp/Platform/Zmq/ZmqPoller.cs

@@ -0,0 +1,57 @@
+using System;
+using NetMQ;
+
+namespace Zmq
+{
+	public class ZmqPoller: Poller
+	{
+		private readonly object eventsLock = new object();
+		private Action events;
+
+		public ZmqPoller()
+		{
+			AddTimer();
+		}
+
+		private void AddTimer()
+		{
+			var timer = new NetMQTimer(TimeSpan.FromMilliseconds(10));
+			timer.Elapsed += (sender, args) => this.OnEvents();
+			AddTimer(timer);
+		}
+
+		public event Action Events
+		{
+			add
+			{
+				lock (this.eventsLock)
+				{
+					this.events += value;
+				}
+			}
+			remove
+			{
+				lock (this.eventsLock)
+				{
+					this.events -= value;
+				}
+			}
+		}
+
+		private void OnEvents()
+		{
+			Action local = null;
+			lock (this.eventsLock)
+			{
+				if (this.events == null)
+				{
+					return;
+				}
+				local = this.events;
+				this.events = null;
+			}
+			local();
+			AddTimer();
+		}
+	}
+}

+ 25 - 6
CSharp/Platform/Zmq/ZmqSocket.cs

@@ -4,20 +4,29 @@ using NetMQ;
 
 namespace Zmq
 {
-	public class ZmqSocket
+	public class ZmqSocket: IDisposable
 	{
+		private ZmqPoller poller;
 		private readonly NetMQSocket socket;
 
-		public ZmqSocket(NetMQSocket socket)
+		public ZmqSocket(ZmqPoller poller, NetMQSocket socket)
 		{
+			this.poller = poller;
 			this.socket = socket;
+			poller.AddSocket(this.socket);
+		}
+
+		public void Dispose()
+		{
+			this.poller.RemoveSocket(this.socket);
+			this.socket.Dispose();
 		}
 
 		private EventHandler<NetMQSocketEventArgs> SendHandler { get; set; }
 
 		private EventHandler<NetMQSocketEventArgs> RecvHandler { get; set; }
 
-	    public Task<byte[]> Recv()
+	    public Task<byte[]> RecvAsync()
 	    {
 			var tcs = new TaskCompletionSource<byte[]>();
 
@@ -32,18 +41,28 @@ namespace Zmq
 		    return tcs.Task;
 	    }
 
-		public Task<bool> Send(byte[] bytes)
+		public Task<bool> SendAsync(byte[] bytes)
 		{
 			var tcs = new TaskCompletionSource<bool>();
 
 			this.SendHandler = (sender, args) =>
 			{
 				args.Socket.SendReady -= this.SendHandler;
+				this.socket.Send(bytes, bytes.Length, true);
 				tcs.TrySetResult(true);
 			};
 			this.socket.SendReady += this.SendHandler;
-			this.socket.Send(bytes, bytes.Length, true);
 			return tcs.Task;
 		}
-    }
+
+		public void Connect(string address)
+		{
+			this.socket.Connect(address);
+		}
+
+		public void Bind(string address)
+		{
+			this.socket.Bind(address);
+		}
+	}
 }

+ 61 - 3
CSharp/Platform/ZmqTest/ZmqReqRepTest.cs

@@ -1,7 +1,10 @@
-using System.Threading.Tasks;
+using System;
+using System.Threading.Tasks;
+using Helper;
 using Log;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using NetMQ;
+using Zmq;
 
 namespace ZmqTest
 {
@@ -13,8 +16,8 @@ namespace ZmqTest
 		[TestMethod]
 		public void TestMethod()
 		{
-			var task1 = Task.Factory.StartNew(Server);
-			var task2 = Task.Factory.StartNew(Client);
+			var task1 = Task.Factory.StartNew(Server, TaskCreationOptions.LongRunning);
+			var task2 = Task.Factory.StartNew(Client, TaskCreationOptions.LongRunning);
 			Task.WaitAll(task1, task2);
 		}
 
@@ -61,5 +64,60 @@ namespace ZmqTest
 				}
 			}
 		}
+
+		[TestMethod]
+		public void TestSendAsyncAndRecvAsync()
+		{
+			var clientPoller = new ZmqPoller();
+			var serverPoller = new ZmqPoller();
+
+			clientPoller.Events += () => Client2(clientPoller);
+
+			serverPoller.Events += () => Server2(serverPoller);
+
+			var task1 = Task.Factory.StartNew(clientPoller.Start, TaskCreationOptions.LongRunning);
+			var task2 = Task.Factory.StartNew(serverPoller.Start, TaskCreationOptions.LongRunning);
+			Task.WaitAll(task1, task2);
+		}
+
+		public static async Task Client2(ZmqPoller poller)
+		{
+			using (var context = NetMQContext.Create())
+			{
+				try
+				{
+					var socket = new ZmqSocket(poller, context.CreateRequestSocket());
+					socket.Connect(address);
+					await socket.SendAsync("hello world".ToByteArray());
+					byte[] bytes = await socket.RecvAsync();
+					Logger.Debug(string.Format("client2: {0}", bytes.ToStr()));
+					await Task.Run(() => poller.Stop(false));
+				}
+				catch (Exception e)
+				{
+					Logger.Debug(string.Format("exception: {0}", e.StackTrace));
+				}
+			}	
+		}
+
+		public static async Task Server2(ZmqPoller poller)
+		{
+			using (var context = NetMQContext.Create())
+			{
+				try
+				{
+					var socket = new ZmqSocket(poller, context.CreateResponseSocket());
+					socket.Bind(address);
+					byte[] bytes = await socket.RecvAsync();
+					Logger.Debug(string.Format("server2: {0}", bytes.ToStr()));
+					await socket.SendAsync("hello world".ToByteArray());
+					await Task.Run(() => poller.Stop(false));
+				}
+				catch (Exception e)
+				{
+					Logger.Debug(string.Format("exception2: {0}", e.StackTrace));
+				}
+			}	
+		}
 	}
 }

+ 8 - 0
CSharp/Platform/ZmqTest/ZmqTest.csproj

@@ -62,10 +62,18 @@
     <None Include="packages.config" />
   </ItemGroup>
   <ItemGroup>
+    <ProjectReference Include="..\Helper\Helper.csproj">
+      <Project>{24233cd5-a5df-484b-a482-b79cb7a0d9cb}</Project>
+      <Name>Helper</Name>
+    </ProjectReference>
     <ProjectReference Include="..\Log\Log.csproj">
       <Project>{72e16572-fc1f-4a9e-bc96-035417239298}</Project>
       <Name>Log</Name>
     </ProjectReference>
+    <ProjectReference Include="..\Zmq\Zmq.csproj">
+      <Project>{31a4af28-0988-4862-9d5a-69d34cdb50e6}</Project>
+      <Name>Zmq</Name>
+    </ProjectReference>
   </ItemGroup>
   <Choose>
     <When Condition="'$(VisualStudioVersion)' == '10.0' And '$(IsCodedUITest)' == 'True'">