UChannel.cs 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.Sockets;
  5. using System.Threading.Tasks;
  6. namespace Base
  7. {
  8. internal class UChannel: AChannel
  9. {
  10. private readonly USocket socket;
  11. private TaskCompletionSource<byte[]> recvTcs;
  12. /// <summary>
  13. /// connect
  14. /// </summary>
  15. public UChannel(USocket socket, string host, int port, UService service): base(service, ChannelType.Connect)
  16. {
  17. this.socket = socket;
  18. this.service = service;
  19. this.RemoteAddress = host + ":" + port;
  20. this.socket.ConnectAsync(host, (ushort)port);
  21. this.socket.Received += this.OnRecv;
  22. this.socket.Disconnect += () => { this.OnError(this, SocketError.SocketError); };
  23. }
  24. /// <summary>
  25. /// accept
  26. /// </summary>
  27. public UChannel(USocket socket, UService service) : base(service, ChannelType.Accept)
  28. {
  29. this.socket = socket;
  30. this.service = service;
  31. this.RemoteAddress = socket.RemoteAddress;
  32. this.socket.Received += this.OnRecv;
  33. this.socket.Disconnect += () => { this.OnError(this, SocketError.SocketError); };
  34. }
  35. public override void Dispose()
  36. {
  37. if (this.Id == 0)
  38. {
  39. return;
  40. }
  41. base.Dispose();
  42. this.socket.Dispose();
  43. }
  44. public override void Send(byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  45. {
  46. if (this.Id == 0)
  47. {
  48. throw new Exception("UChannel已经被Dispose, 不能发送消息");
  49. }
  50. this.socket.SendAsync(buffer, channelID, flags);
  51. }
  52. public override void Send(List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable)
  53. {
  54. if (this.Id == 0)
  55. {
  56. throw new Exception("UChannel已经被Dispose, 不能发送消息");
  57. }
  58. int size = buffers.Select(b => b.Length).Sum();
  59. var buffer = new byte[size];
  60. int index = 0;
  61. foreach (byte[] bytes in buffers)
  62. {
  63. Array.Copy(bytes, 0, buffer, index, bytes.Length);
  64. index += bytes.Length;
  65. }
  66. this.socket.SendAsync(buffer, channelID, flags);
  67. }
  68. public override Task<byte[]> Recv()
  69. {
  70. if (this.Id == 0)
  71. {
  72. throw new Exception("UChannel已经被Dispose, 不能接收消息");
  73. }
  74. var recvQueue = this.socket.RecvQueue;
  75. if (recvQueue.Count > 0)
  76. {
  77. return Task.FromResult(recvQueue.Dequeue());
  78. }
  79. recvTcs = new TaskCompletionSource<byte[]>();
  80. return recvTcs.Task;
  81. }
  82. private void OnRecv()
  83. {
  84. var tcs = this.recvTcs;
  85. this.recvTcs = null;
  86. tcs?.SetResult(this.socket.RecvQueue.Dequeue());
  87. }
  88. }
  89. }