tanghai 12 лет назад
Родитель
Сommit
48bd323812

+ 25 - 21
CSharp/Platform/Zmq/ZmqPoller.cs → CSharp/Platform/Zmq/ZPoller.cs

@@ -1,24 +1,13 @@
 using System;
-using NetMQ;
+using ZeroMQ;
 
 namespace Zmq
 {
-	public class ZmqPoller: Poller
+	public class ZPoller: Poller
 	{
 		private readonly object eventsLock = new object();
-		private Action events;
-
-		public ZmqPoller()
-		{
-			AddTimer();
-		}
-
-		private void AddTimer()
-		{
-			var timer = new NetMQTimer(TimeSpan.FromMilliseconds(10));
-			timer.Elapsed += (sender, args) => this.OnEvents();
-			AddTimer(timer);
-		}
+		private Action events = () => {};
+		private bool isRunning = true;
 
 		public event Action Events
 		{
@@ -43,15 +32,30 @@ namespace Zmq
 			Action local = null;
 			lock (this.eventsLock)
 			{
-				if (this.events == null)
-				{
-					return;
-				}
 				local = this.events;
-				this.events = null;
+				this.events = () => {};
 			}
 			local();
-			AddTimer();
+		}
+
+		public void Add(ZSocket socket)
+		{
+			this.AddSocket(socket.ZmqSocket);
+		}
+
+		public void Start()
+		{
+			isRunning = true;
+			this.OnEvents();
+			while (isRunning)
+			{
+				this.Poll(TimeSpan.FromMilliseconds(0));
+			}
+		}
+
+		public void Stop()
+		{
+			isRunning = false;
 		}
 	}
 }

+ 21 - 16
CSharp/Platform/Zmq/ZmqSocket.cs → CSharp/Platform/Zmq/ZSocket.cs

@@ -1,54 +1,59 @@
 using System;
+using System.Text;
 using System.Threading.Tasks;
-using NetMQ;
+using ZeroMQ;
 
 namespace Zmq
 {
-	public class ZmqSocket: IDisposable
+	public class ZSocket: IDisposable
 	{
-		private ZmqPoller poller;
-		private readonly NetMQSocket socket;
+		private readonly ZmqSocket socket;
 
-		public ZmqSocket(ZmqPoller poller, NetMQSocket socket)
+		public ZSocket(ZmqSocket socket)
 		{
-			this.poller = poller;
 			this.socket = socket;
-			poller.AddSocket(this.socket);
+			this.socket.ReceiveReady += delegate { };
 		}
 
 		public void Dispose()
 		{
-			this.poller.RemoveSocket(this.socket);
 			this.socket.Dispose();
 		}
 
-		private EventHandler<NetMQSocketEventArgs> SendHandler { get; set; }
+		public ZmqSocket ZmqSocket
+		{
+			get
+			{
+				return this.socket;
+			}
+		}
+
+		private EventHandler<SocketEventArgs> SendHandler { get; set; }
 
-		private EventHandler<NetMQSocketEventArgs> RecvHandler { get; set; }
+		private EventHandler<SocketEventArgs> RecvHandler { get; set; }
 
-	    public Task<byte[]> RecvAsync()
+	    public Task<string> RecvAsync()
 	    {
-			var tcs = new TaskCompletionSource<byte[]>();
+			var tcs = new TaskCompletionSource<string>();
 
 			this.RecvHandler = (sender, args) =>
 		    {
-				bool hasMore = false;
 				args.Socket.ReceiveReady -= this.RecvHandler;
-				tcs.TrySetResult(args.Socket.Receive(true, out hasMore));
+				tcs.TrySetResult(args.Socket.Receive(Encoding.Unicode));
 		    };
 
 			this.socket.ReceiveReady += this.RecvHandler;
 		    return tcs.Task;
 	    }
 
-		public Task<bool> SendAsync(byte[] bytes)
+		public Task<bool> SendAsync(string str)
 		{
 			var tcs = new TaskCompletionSource<bool>();
 
 			this.SendHandler = (sender, args) =>
 			{
 				args.Socket.SendReady -= this.SendHandler;
-				this.socket.Send(bytes, bytes.Length, true);
+				this.socket.Send(str, Encoding.Unicode, TimeSpan.FromMilliseconds(0));
 				tcs.TrySetResult(true);
 			};
 			this.socket.SendReady += this.SendHandler;

+ 4 - 4
CSharp/Platform/Zmq/Zmq.csproj

@@ -32,8 +32,8 @@
     <WarningLevel>4</WarningLevel>
   </PropertyGroup>
   <ItemGroup>
-    <Reference Include="NetMQ">
-      <HintPath>..\..\packages\NetMQ.3.3.0.8\lib\net40\NetMQ.dll</HintPath>
+    <Reference Include="clrzmq">
+      <HintPath>..\..\packages\clrzmq.3.0.0-rc1\lib\net40\clrzmq.dll</HintPath>
     </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core" />
@@ -44,8 +44,8 @@
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
-    <Compile Include="ZmqPoller.cs" />
-    <Compile Include="ZmqSocket.cs" />
+    <Compile Include="ZPoller.cs" />
+    <Compile Include="ZSocket.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>

+ 1 - 1
CSharp/Platform/Zmq/packages.config

@@ -1,4 +1,4 @@
 <?xml version="1.0" encoding="utf-8"?>
 <packages>
-  <package id="NetMQ" version="3.3.0.8" targetFramework="net45" />
+  <package id="clrzmq" version="3.0.0-rc1" targetFramework="net45" />
 </packages>

+ 19 - 69
CSharp/Platform/ZmqTest/ZmqReqRepTest.cs

@@ -3,8 +3,8 @@ using System.Threading.Tasks;
 using Helper;
 using Log;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using NetMQ;
 using Zmq;
+using ZeroMQ;
 
 namespace ZmqTest
 {
@@ -13,63 +13,11 @@ namespace ZmqTest
 	{
 		const string address = "tcp://127.0.0.1:5001";
 
-		[TestMethod]
-		public void TestMethod()
-		{
-			var task1 = Task.Factory.StartNew(Server, TaskCreationOptions.LongRunning);
-			var task2 = Task.Factory.StartNew(Client, TaskCreationOptions.LongRunning);
-			Task.WaitAll(task1, task2);
-		}
-
-		private static void Client()
-		{
-			using (var context = NetMQContext.Create())
-			{
-				using (var req = context.CreateRequestSocket())
-				{
-					var poller = new Poller();
-					req.Connect(address);
-					req.ReceiveReady += (sender, args) =>
-					{
-						bool hasMore;
-						string msg = args.Socket.ReceiveString(true, out hasMore);
-						Logger.Debug(string.Format("req: {0}", msg));
-						poller.Stop();
-					};
-					req.Send("hello world!");
-					poller.AddSocket(req);
-					poller.Start();
-				}
-			}
-		}
-
-		private static void Server()
-		{
-			using (var context = NetMQContext.Create())
-			{
-				using (var rep = context.CreateResponseSocket())
-				{
-					var poller = new Poller();
-					poller.AddSocket(rep);
-					rep.Bind(address);
-					rep.ReceiveReady += (sender, args) =>
-					{
-						bool hasMore;
-						string msg = args.Socket.ReceiveString(true, out hasMore);
-						Logger.Debug(string.Format("rep: {0}", msg));
-						args.Socket.Send(msg, true);
-						poller.Stop();
-					};
-					poller.Start();
-				}
-			}
-		}
-
 		[TestMethod]
 		public void TestSendAsyncAndRecvAsync()
 		{
-			var clientPoller = new ZmqPoller();
-			var serverPoller = new ZmqPoller();
+			var clientPoller = new ZPoller();
+			var serverPoller = new ZPoller();
 
 			clientPoller.Events += () => Client2(clientPoller);
 
@@ -80,18 +28,19 @@ namespace ZmqTest
 			Task.WaitAll(task1, task2);
 		}
 
-		public static async Task Client2(ZmqPoller poller)
+		public static async Task Client2(ZPoller zPoller)
 		{
-			using (var context = NetMQContext.Create())
+			using (var context = ZmqContext.Create())
 			{
 				try
 				{
-					var socket = new ZmqSocket(poller, context.CreateRequestSocket());
+					var socket = new ZSocket(context.CreateSocket(SocketType.REP));
+					zPoller.Add(socket);
 					socket.Connect(address);
-					await socket.SendAsync("hello world".ToByteArray());
-					byte[] bytes = await socket.RecvAsync();
-					Logger.Debug(string.Format("client2: {0}", bytes.ToStr()));
-					await Task.Run(() => poller.Stop(false));
+					await socket.SendAsync("hello world");
+					string recvStr = await socket.RecvAsync();
+					Logger.Debug(string.Format("client2: {0}", recvStr));
+					zPoller.Stop();
 				}
 				catch (Exception e)
 				{
@@ -100,18 +49,19 @@ namespace ZmqTest
 			}	
 		}
 
-		public static async Task Server2(ZmqPoller poller)
+		public static async Task Server2(ZPoller zPoller)
 		{
-			using (var context = NetMQContext.Create())
+			using (var context = ZmqContext.Create())
 			{
 				try
 				{
-					var socket = new ZmqSocket(poller, context.CreateResponseSocket());
+					var socket = new ZSocket(context.CreateSocket(SocketType.REP));
+					zPoller.Add(socket);
 					socket.Bind(address);
-					byte[] bytes = await socket.RecvAsync();
-					Logger.Debug(string.Format("server2: {0}", bytes.ToStr()));
-					await socket.SendAsync("hello world".ToByteArray());
-					await Task.Run(() => poller.Stop(false));
+					string recvStr = await socket.RecvAsync();
+					Logger.Debug(string.Format("server2: {0}", recvStr));
+					await socket.SendAsync("hello world");
+					zPoller.Stop();
 				}
 				catch (Exception e)
 				{

+ 5 - 5
CSharp/Platform/ZmqTest/ZmqTest.csproj

@@ -37,8 +37,8 @@
     <WarningLevel>4</WarningLevel>
   </PropertyGroup>
   <ItemGroup>
-    <Reference Include="NetMQ">
-      <HintPath>..\..\packages\NetMQ.3.3.0.8\lib\net40\NetMQ.dll</HintPath>
+    <Reference Include="clrzmq">
+      <HintPath>..\..\packages\clrzmq.3.0.0-rc1\lib\net40\clrzmq.dll</HintPath>
     </Reference>
     <Reference Include="System" />
   </ItemGroup>
@@ -58,9 +58,6 @@
     <Compile Include="ZmqReqRepTest.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
-  <ItemGroup>
-    <None Include="packages.config" />
-  </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="..\Helper\Helper.csproj">
       <Project>{24233cd5-a5df-484b-a482-b79cb7a0d9cb}</Project>
@@ -75,6 +72,9 @@
       <Name>Zmq</Name>
     </ProjectReference>
   </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+  </ItemGroup>
   <Choose>
     <When Condition="'$(VisualStudioVersion)' == '10.0' And '$(IsCodedUITest)' == 'True'">
       <ItemGroup>

+ 1 - 1
CSharp/Platform/ZmqTest/packages.config

@@ -1,4 +1,4 @@
 <?xml version="1.0" encoding="utf-8"?>
 <packages>
-  <package id="NetMQ" version="3.3.0.8" targetFramework="net45" />
+  <package id="clrzmq" version="3.0.0-rc1" targetFramework="net45" />
 </packages>