TSocket.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. using System;
  2. using System.Net;
  3. using System.Net.Sockets;
  4. using System.Threading.Tasks;
  5. namespace Base
  6. {
  7. /// <summary>
  8. /// 封装Socket,将回调push到主线程处理
  9. /// </summary>
  10. public class TSocket: IDisposable
  11. {
  12. private readonly TPoller poller;
  13. private Socket socket;
  14. private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
  15. private readonly SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
  16. public Action<SocketError> OnConn;
  17. public Action<int, SocketError> OnRecv;
  18. public Action<int, SocketError> OnSend;
  19. public Action<SocketError> OnDisconnect;
  20. public TSocket(TPoller poller)
  21. {
  22. this.poller = poller;
  23. this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  24. this.innArgs.Completed += this.OnComplete;
  25. this.outArgs.Completed += this.OnComplete;
  26. }
  27. public TSocket(TPoller poller, string host, int port): this(poller)
  28. {
  29. this.Bind(host, port);
  30. this.Listen(100);
  31. }
  32. public Socket Socket
  33. {
  34. get
  35. {
  36. return this.socket;
  37. }
  38. }
  39. public string RemoteAddress
  40. {
  41. get
  42. {
  43. IPEndPoint ipEndPoint = (IPEndPoint)this.socket.RemoteEndPoint;
  44. return ipEndPoint.Address + ":" + ipEndPoint.Port;
  45. }
  46. }
  47. public void Dispose()
  48. {
  49. if (this.socket == null)
  50. {
  51. return;
  52. }
  53. this.socket.Close();
  54. this.socket = null;
  55. }
  56. private void Bind(string host, int port)
  57. {
  58. this.socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
  59. }
  60. private void Listen(int backlog)
  61. {
  62. this.socket.Listen(backlog);
  63. }
  64. public Task<bool> AcceptAsync(TSocket accpetSocket)
  65. {
  66. var tcs = new TaskCompletionSource<bool>();
  67. this.innArgs.UserToken = tcs;
  68. this.innArgs.AcceptSocket = accpetSocket.socket;
  69. if (!this.socket.AcceptAsync(this.innArgs))
  70. {
  71. OnAcceptComplete(this.innArgs);
  72. }
  73. return tcs.Task;
  74. }
  75. private static void OnAcceptComplete(SocketAsyncEventArgs e)
  76. {
  77. var tcs = (TaskCompletionSource<bool>)e.UserToken;
  78. e.UserToken = null;
  79. if (e.SocketError != SocketError.Success)
  80. {
  81. tcs.SetException(new Exception($"socket error: {e.SocketError}"));
  82. return;
  83. }
  84. tcs.SetResult(true);
  85. }
  86. private void OnComplete(object sender, SocketAsyncEventArgs e)
  87. {
  88. Action action;
  89. switch (e.LastOperation)
  90. {
  91. case SocketAsyncOperation.Connect:
  92. action = () => OnConnectComplete(e);
  93. break;
  94. case SocketAsyncOperation.Receive:
  95. action = () => OnRecvComplete(e);
  96. break;
  97. case SocketAsyncOperation.Send:
  98. action = () => OnSendComplete(e);
  99. break;
  100. case SocketAsyncOperation.Disconnect:
  101. action = () => OnDisconnectComplete(e);
  102. break;
  103. case SocketAsyncOperation.Accept:
  104. action = () => OnAcceptComplete(e);
  105. break;
  106. default:
  107. throw new Exception($"socket error: {e.LastOperation}");
  108. }
  109. // 回调到主线程处理
  110. this.poller.Add(action);
  111. }
  112. public bool ConnectAsync(string host, int port)
  113. {
  114. this.outArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(host), port);
  115. if (this.socket.ConnectAsync(this.outArgs))
  116. {
  117. return true;
  118. }
  119. OnConnectComplete(this.outArgs);
  120. return false;
  121. }
  122. private void OnConnectComplete(SocketAsyncEventArgs e)
  123. {
  124. if (this.OnConn == null)
  125. {
  126. return;
  127. }
  128. this.OnConn(e.SocketError);
  129. }
  130. public bool RecvAsync(byte[] buffer, int offset, int count)
  131. {
  132. try
  133. {
  134. this.innArgs.SetBuffer(buffer, offset, count);
  135. }
  136. catch (Exception e)
  137. {
  138. throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
  139. }
  140. if (this.socket.ReceiveAsync(this.innArgs))
  141. {
  142. return true;
  143. }
  144. OnRecvComplete(this.innArgs);
  145. return false;
  146. }
  147. private void OnRecvComplete(SocketAsyncEventArgs e)
  148. {
  149. if (this.OnRecv == null)
  150. {
  151. return;
  152. }
  153. this.OnRecv(e.BytesTransferred, e.SocketError);
  154. }
  155. public bool SendAsync(byte[] buffer, int offset, int count)
  156. {
  157. try
  158. {
  159. this.outArgs.SetBuffer(buffer, offset, count);
  160. }
  161. catch (Exception e)
  162. {
  163. throw new Exception($"socket set buffer error: {buffer.Length}, {offset}, {count}", e);
  164. }
  165. if (this.socket.SendAsync(this.outArgs))
  166. {
  167. return true;
  168. }
  169. OnSendComplete(this.outArgs);
  170. return false;
  171. }
  172. private void OnSendComplete(SocketAsyncEventArgs e)
  173. {
  174. if (this.OnSend == null)
  175. {
  176. return;
  177. }
  178. this.OnSend(e.BytesTransferred, e.SocketError);
  179. }
  180. private void OnDisconnectComplete(SocketAsyncEventArgs e)
  181. {
  182. if (this.OnDisconnect == null)
  183. {
  184. return;
  185. }
  186. this.OnDisconnect(e.SocketError);
  187. }
  188. }
  189. }