ZmqSocket.cs 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. using System;
  2. using System.Threading.Tasks;
  3. using NetMQ;
  4. namespace Zmq
  5. {
  6. public class ZmqSocket: IDisposable
  7. {
  8. private ZmqPoller poller;
  9. private readonly NetMQSocket socket;
  10. public ZmqSocket(ZmqPoller poller, NetMQSocket socket)
  11. {
  12. this.poller = poller;
  13. this.socket = socket;
  14. poller.AddSocket(this.socket);
  15. }
  16. public void Dispose()
  17. {
  18. this.poller.RemoveSocket(this.socket);
  19. this.socket.Dispose();
  20. }
  21. private EventHandler<NetMQSocketEventArgs> SendHandler { get; set; }
  22. private EventHandler<NetMQSocketEventArgs> RecvHandler { get; set; }
  23. public Task<byte[]> RecvAsync()
  24. {
  25. var tcs = new TaskCompletionSource<byte[]>();
  26. this.RecvHandler = (sender, args) =>
  27. {
  28. bool hasMore = false;
  29. args.Socket.ReceiveReady -= this.RecvHandler;
  30. tcs.TrySetResult(args.Socket.Receive(true, out hasMore));
  31. };
  32. this.socket.ReceiveReady += this.RecvHandler;
  33. return tcs.Task;
  34. }
  35. public Task<bool> SendAsync(byte[] bytes)
  36. {
  37. var tcs = new TaskCompletionSource<bool>();
  38. this.SendHandler = (sender, args) =>
  39. {
  40. args.Socket.SendReady -= this.SendHandler;
  41. this.socket.Send(bytes, bytes.Length, true);
  42. tcs.TrySetResult(true);
  43. };
  44. this.socket.SendReady += this.SendHandler;
  45. return tcs.Task;
  46. }
  47. public void Connect(string address)
  48. {
  49. this.socket.Connect(address);
  50. }
  51. public void Bind(string address)
  52. {
  53. this.socket.Bind(address);
  54. }
  55. }
  56. }