TSocket.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. using System;
  2. using System.Net;
  3. using System.Net.Sockets;
  4. using System.Threading.Tasks;
  5. namespace Base
  6. {
  7. /// <summary>
  8. /// 封装Socket,将回调push到主线程处理
  9. /// </summary>
  10. public class TSocket: IDisposable
  11. {
  12. private readonly TPoller poller;
  13. private Socket socket;
  14. private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
  15. private readonly SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
  16. public Action<SocketError> OnConn;
  17. public Action<int, SocketError> OnRecv;
  18. public Action<int, SocketError> OnSend;
  19. public Action<SocketError> OnDisconnect;
  20. public TSocket(TPoller poller)
  21. {
  22. this.poller = poller;
  23. this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  24. this.innArgs.Completed += this.OnComplete;
  25. this.outArgs.Completed += this.OnComplete;
  26. }
  27. public TSocket(TPoller poller, string host, int port): this(poller)
  28. {
  29. this.Bind(host, port);
  30. this.Listen(100);
  31. }
  32. public string RemoteAddress { get; private set; }
  33. public Socket Socket
  34. {
  35. get
  36. {
  37. return this.socket;
  38. }
  39. }
  40. public void Dispose()
  41. {
  42. if (this.socket == null)
  43. {
  44. return;
  45. }
  46. this.socket.Close();
  47. this.socket = null;
  48. }
  49. private void Bind(string host, int port)
  50. {
  51. this.socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
  52. }
  53. private void Listen(int backlog)
  54. {
  55. this.socket.Listen(backlog);
  56. }
  57. public Task<bool> AcceptAsync(TSocket accpetSocket)
  58. {
  59. var tcs = new TaskCompletionSource<bool>();
  60. this.innArgs.UserToken = tcs;
  61. this.innArgs.AcceptSocket = accpetSocket.socket;
  62. if (!this.socket.AcceptAsync(this.innArgs))
  63. {
  64. OnAcceptComplete(this.innArgs);
  65. }
  66. return tcs.Task;
  67. }
  68. private static void OnAcceptComplete(SocketAsyncEventArgs e)
  69. {
  70. var tcs = (TaskCompletionSource<bool>)e.UserToken;
  71. e.UserToken = null;
  72. if (e.SocketError != SocketError.Success)
  73. {
  74. tcs.SetException(new Exception($"socket error: {e.SocketError}"));
  75. return;
  76. }
  77. tcs.SetResult(true);
  78. }
  79. private void OnComplete(object sender, SocketAsyncEventArgs e)
  80. {
  81. Action action;
  82. switch (e.LastOperation)
  83. {
  84. case SocketAsyncOperation.Connect:
  85. action = () => OnConnectComplete(e);
  86. break;
  87. case SocketAsyncOperation.Receive:
  88. action = () => OnRecvComplete(e);
  89. break;
  90. case SocketAsyncOperation.Send:
  91. action = () => OnSendComplete(e);
  92. break;
  93. case SocketAsyncOperation.Disconnect:
  94. action = () => OnDisconnectComplete(e);
  95. break;
  96. case SocketAsyncOperation.Accept:
  97. action = () => OnAcceptComplete(e);
  98. break;
  99. default:
  100. throw new Exception($"socket error: {e.LastOperation}");
  101. }
  102. // 回调到主线程处理
  103. this.poller.Add(action);
  104. }
  105. public bool ConnectAsync(string host, int port)
  106. {
  107. this.RemoteAddress = $"{host}:{port}";
  108. this.outArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
  109. if (this.socket.ConnectAsync(this.outArgs))
  110. {
  111. return true;
  112. }
  113. OnConnectComplete(this.outArgs);
  114. return false;
  115. }
  116. private void OnConnectComplete(SocketAsyncEventArgs e)
  117. {
  118. if (this.OnConn == null)
  119. {
  120. return;
  121. }
  122. this.OnConn(e.SocketError);
  123. }
  124. public bool RecvAsync(byte[] buffer, int offset, int count)
  125. {
  126. try
  127. {
  128. this.innArgs.SetBuffer(buffer, offset, count);
  129. }
  130. catch (Exception e)
  131. {
  132. throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
  133. }
  134. if (this.socket.ReceiveAsync(this.innArgs))
  135. {
  136. return true;
  137. }
  138. OnRecvComplete(this.innArgs);
  139. return false;
  140. }
  141. private void OnRecvComplete(SocketAsyncEventArgs e)
  142. {
  143. if (this.OnRecv == null)
  144. {
  145. return;
  146. }
  147. this.OnRecv(e.BytesTransferred, e.SocketError);
  148. }
  149. public bool SendAsync(byte[] buffer, int offset, int count)
  150. {
  151. try
  152. {
  153. this.outArgs.SetBuffer(buffer, offset, count);
  154. }
  155. catch (Exception e)
  156. {
  157. throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
  158. }
  159. if (this.socket.SendAsync(this.outArgs))
  160. {
  161. return true;
  162. }
  163. OnSendComplete(this.outArgs);
  164. return false;
  165. }
  166. private void OnSendComplete(SocketAsyncEventArgs e)
  167. {
  168. if (this.OnSend == null)
  169. {
  170. return;
  171. }
  172. this.OnSend(e.BytesTransferred, e.SocketError);
  173. }
  174. private void OnDisconnectComplete(SocketAsyncEventArgs e)
  175. {
  176. if (this.OnDisconnect == null)
  177. {
  178. return;
  179. }
  180. this.OnDisconnect(e.SocketError);
  181. }
  182. }
  183. }