ZmqReqRepTest.cs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. using System;
  2. using System.Threading.Tasks;
  3. using Helper;
  4. using Log;
  5. using Microsoft.VisualStudio.TestTools.UnitTesting;
  6. using NetMQ;
  7. using Zmq;
  8. namespace ZmqTest
  9. {
  10. [TestClass]
  11. public class ZmqReqRepTest
  12. {
  13. const string address = "tcp://127.0.0.1:5001";
  14. [TestMethod]
  15. public void TestMethod()
  16. {
  17. var task1 = Task.Factory.StartNew(Server, TaskCreationOptions.LongRunning);
  18. var task2 = Task.Factory.StartNew(Client, TaskCreationOptions.LongRunning);
  19. Task.WaitAll(task1, task2);
  20. }
  21. private static void Client()
  22. {
  23. using (var context = NetMQContext.Create())
  24. {
  25. using (var req = context.CreateRequestSocket())
  26. {
  27. var poller = new Poller();
  28. req.Connect(address);
  29. req.ReceiveReady += (sender, args) =>
  30. {
  31. bool hasMore;
  32. string msg = args.Socket.ReceiveString(true, out hasMore);
  33. Logger.Debug(string.Format("req: {0}", msg));
  34. poller.Stop();
  35. };
  36. req.Send("hello world!");
  37. poller.AddSocket(req);
  38. poller.Start();
  39. }
  40. }
  41. }
  42. private static void Server()
  43. {
  44. using (var context = NetMQContext.Create())
  45. {
  46. using (var rep = context.CreateResponseSocket())
  47. {
  48. var poller = new Poller();
  49. poller.AddSocket(rep);
  50. rep.Bind(address);
  51. rep.ReceiveReady += (sender, args) =>
  52. {
  53. bool hasMore;
  54. string msg = args.Socket.ReceiveString(true, out hasMore);
  55. Logger.Debug(string.Format("rep: {0}", msg));
  56. args.Socket.Send(msg, true);
  57. poller.Stop();
  58. };
  59. poller.Start();
  60. }
  61. }
  62. }
  63. [TestMethod]
  64. public void TestSendAsyncAndRecvAsync()
  65. {
  66. var clientPoller = new ZmqPoller();
  67. var serverPoller = new ZmqPoller();
  68. clientPoller.Events += () => Client2(clientPoller);
  69. serverPoller.Events += () => Server2(serverPoller);
  70. var task1 = Task.Factory.StartNew(clientPoller.Start, TaskCreationOptions.LongRunning);
  71. var task2 = Task.Factory.StartNew(serverPoller.Start, TaskCreationOptions.LongRunning);
  72. Task.WaitAll(task1, task2);
  73. }
  74. public static async Task Client2(ZmqPoller poller)
  75. {
  76. using (var context = NetMQContext.Create())
  77. {
  78. try
  79. {
  80. var socket = new ZmqSocket(poller, context.CreateRequestSocket());
  81. socket.Connect(address);
  82. await socket.SendAsync("hello world".ToByteArray());
  83. byte[] bytes = await socket.RecvAsync();
  84. Logger.Debug(string.Format("client2: {0}", bytes.ToStr()));
  85. await Task.Run(() => poller.Stop(false));
  86. }
  87. catch (Exception e)
  88. {
  89. Logger.Debug(string.Format("exception: {0}", e.StackTrace));
  90. }
  91. }
  92. }
  93. public static async Task Server2(ZmqPoller poller)
  94. {
  95. using (var context = NetMQContext.Create())
  96. {
  97. try
  98. {
  99. var socket = new ZmqSocket(poller, context.CreateResponseSocket());
  100. socket.Bind(address);
  101. byte[] bytes = await socket.RecvAsync();
  102. Logger.Debug(string.Format("server2: {0}", bytes.ToStr()));
  103. await socket.SendAsync("hello world".ToByteArray());
  104. await Task.Run(() => poller.Stop(false));
  105. }
  106. catch (Exception e)
  107. {
  108. Logger.Debug(string.Format("exception2: {0}", e.StackTrace));
  109. }
  110. }
  111. }
  112. }
  113. }