TChannel.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. using System;
  2. using System.IO;
  3. using System.Net;
  4. using System.Net.Sockets;
  5. namespace ET
  6. {
  7. /// <summary>
  8. /// 封装Socket,将回调push到主线程处理
  9. /// </summary>
  10. public sealed class TChannel : AChannel
  11. {
  12. private readonly TService Service;
  13. private Socket socket;
  14. private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
  15. private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
  16. private readonly CircularBuffer recvBuffer = new CircularBuffer();
  17. private readonly CircularBuffer sendBuffer = new CircularBuffer();
  18. private bool isSending;
  19. private bool isConnected;
  20. private readonly PacketParser parser;
  21. private readonly byte[] sendCache = new byte[Packet.OpcodeLength + Packet.ActorIdLength];
  22. private void OnComplete(object sender, SocketAsyncEventArgs e)
  23. {
  24. switch (e.LastOperation)
  25. {
  26. case SocketAsyncOperation.Connect:
  27. ThreadSynchronizationContext.Instance.Post(() => OnConnectComplete(e));
  28. break;
  29. case SocketAsyncOperation.Receive:
  30. ThreadSynchronizationContext.Instance.Post(() => OnRecvComplete(e));
  31. break;
  32. case SocketAsyncOperation.Send:
  33. ThreadSynchronizationContext.Instance.Post(() => OnSendComplete(e));
  34. break;
  35. case SocketAsyncOperation.Disconnect:
  36. ThreadSynchronizationContext.Instance.Post(() => OnDisconnectComplete(e));
  37. break;
  38. default:
  39. throw new Exception($"socket error: {e.LastOperation}");
  40. }
  41. }
  42. #region 网络线程
  43. public TChannel(long id, IPEndPoint ipEndPoint, TService service)
  44. {
  45. this.ChannelType = ChannelType.Connect;
  46. this.Id = id;
  47. this.Service = service;
  48. this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  49. this.socket.NoDelay = true;
  50. this.parser = new PacketParser(this.recvBuffer, this.Service);
  51. this.innArgs.Completed += this.OnComplete;
  52. this.outArgs.Completed += this.OnComplete;
  53. this.RemoteAddress = ipEndPoint;
  54. this.isConnected = false;
  55. this.isSending = false;
  56. ThreadSynchronizationContext.Instance.PostNext(this.ConnectAsync);
  57. }
  58. public TChannel(long id, Socket socket, TService service)
  59. {
  60. this.ChannelType = ChannelType.Accept;
  61. this.Id = id;
  62. this.Service = service;
  63. this.socket = socket;
  64. this.socket.NoDelay = true;
  65. this.parser = new PacketParser(this.recvBuffer, this.Service);
  66. this.innArgs.Completed += this.OnComplete;
  67. this.outArgs.Completed += this.OnComplete;
  68. this.RemoteAddress = (IPEndPoint)socket.RemoteEndPoint;
  69. this.isConnected = true;
  70. this.isSending = false;
  71. // 下一帧再开始读写
  72. ThreadSynchronizationContext.Instance.PostNext(() =>
  73. {
  74. this.StartRecv();
  75. this.StartSend();
  76. });
  77. }
  78. public override void Dispose()
  79. {
  80. if (this.IsDisposed)
  81. {
  82. return;
  83. }
  84. Log.Info($"channel dispose: {this.Id} {this.RemoteAddress}");
  85. long id = this.Id;
  86. this.Id = 0;
  87. this.Service.Remove(id);
  88. this.socket.Close();
  89. this.innArgs.Dispose();
  90. this.outArgs.Dispose();
  91. this.innArgs = null;
  92. this.outArgs = null;
  93. this.socket = null;
  94. }
  95. public void Send(long actorId, MemoryStream stream)
  96. {
  97. if (this.IsDisposed)
  98. {
  99. throw new Exception("TChannel已经被Dispose, 不能发送消息");
  100. }
  101. switch (this.Service.ServiceType)
  102. {
  103. case ServiceType.Inner:
  104. {
  105. int messageSize = (int)(stream.Length - stream.Position);
  106. if (messageSize > ushort.MaxValue * 16)
  107. {
  108. throw new Exception($"send packet too large: {stream.Length} {stream.Position}");
  109. }
  110. this.sendCache.WriteTo(0, messageSize);
  111. this.sendBuffer.Write(this.sendCache, 0, PacketParser.InnerPacketSizeLength);
  112. // actorId
  113. stream.GetBuffer().WriteTo(0, actorId);
  114. this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position,
  115. (int)(stream.Length - stream.Position));
  116. break;
  117. }
  118. case ServiceType.Outer:
  119. {
  120. stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin); // 外网不需要actorId
  121. ushort messageSize = (ushort)(stream.Length - stream.Position);
  122. this.sendCache.WriteTo(0, messageSize);
  123. this.sendBuffer.Write(this.sendCache, 0, PacketParser.OuterPacketSizeLength);
  124. this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position,
  125. (int)(stream.Length - stream.Position));
  126. break;
  127. }
  128. }
  129. if (!this.isSending)
  130. {
  131. //this.StartSend();
  132. this.Service.NeedStartSend.Add(this.Id);
  133. }
  134. }
  135. private void ConnectAsync()
  136. {
  137. this.outArgs.RemoteEndPoint = this.RemoteAddress;
  138. if (this.socket.ConnectAsync(this.outArgs))
  139. {
  140. return;
  141. }
  142. OnConnectComplete(this.outArgs);
  143. }
  144. private void OnConnectComplete(object o)
  145. {
  146. if (this.socket == null)
  147. {
  148. return;
  149. }
  150. SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
  151. if (e.SocketError != SocketError.Success)
  152. {
  153. this.OnError((int)e.SocketError);
  154. return;
  155. }
  156. e.RemoteEndPoint = null;
  157. this.isConnected = true;
  158. this.StartRecv();
  159. this.StartSend();
  160. }
  161. private void OnDisconnectComplete(object o)
  162. {
  163. SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
  164. this.OnError((int)e.SocketError);
  165. }
  166. private void StartRecv()
  167. {
  168. while (true)
  169. {
  170. try
  171. {
  172. if (this.socket == null)
  173. {
  174. return;
  175. }
  176. int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex;
  177. this.innArgs.SetBuffer(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);
  178. }
  179. catch (Exception e)
  180. {
  181. Log.Error($"tchannel error: {this.Id}\n{e}");
  182. this.OnError(ErrorCore.ERR_TChannelRecvError);
  183. return;
  184. }
  185. if (this.socket.ReceiveAsync(this.innArgs))
  186. {
  187. return;
  188. }
  189. this.HandleRecv(this.innArgs);
  190. }
  191. }
  192. private void OnRecvComplete(object o)
  193. {
  194. this.HandleRecv(o);
  195. if (this.socket == null)
  196. {
  197. return;
  198. }
  199. this.StartRecv();
  200. }
  201. private void HandleRecv(object o)
  202. {
  203. if (this.socket == null)
  204. {
  205. return;
  206. }
  207. SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
  208. if (e.SocketError != SocketError.Success)
  209. {
  210. this.OnError((int)e.SocketError);
  211. return;
  212. }
  213. if (e.BytesTransferred == 0)
  214. {
  215. this.OnError(ErrorCore.ERR_PeerDisconnect);
  216. return;
  217. }
  218. this.recvBuffer.LastIndex += e.BytesTransferred;
  219. if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize)
  220. {
  221. this.recvBuffer.AddLast();
  222. this.recvBuffer.LastIndex = 0;
  223. }
  224. // 收到消息回调
  225. while (true)
  226. {
  227. // 这里循环解析消息执行,有可能,执行消息的过程中断开了session
  228. if (this.socket == null)
  229. {
  230. return;
  231. }
  232. try
  233. {
  234. bool ret = this.parser.Parse(out MemoryStream memoryBuffer);
  235. if (!ret)
  236. {
  237. break;
  238. }
  239. this.OnRead(memoryBuffer);
  240. }
  241. catch (Exception ee)
  242. {
  243. Log.DetectionError($"ip: {this.RemoteAddress} {ee}");
  244. this.OnError(ErrorCore.ERR_SocketError);
  245. return;
  246. }
  247. }
  248. }
  249. public void Update()
  250. {
  251. this.StartSend();
  252. }
  253. private void StartSend()
  254. {
  255. if (!this.isConnected)
  256. {
  257. return;
  258. }
  259. if (this.isSending)
  260. {
  261. return;
  262. }
  263. while (true)
  264. {
  265. try
  266. {
  267. if (this.socket == null)
  268. {
  269. this.isSending = false;
  270. return;
  271. }
  272. // 没有数据需要发送
  273. if (this.sendBuffer.Length == 0)
  274. {
  275. this.isSending = false;
  276. return;
  277. }
  278. this.isSending = true;
  279. int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex;
  280. if (sendSize > this.sendBuffer.Length)
  281. {
  282. sendSize = (int)this.sendBuffer.Length;
  283. }
  284. this.outArgs.SetBuffer(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
  285. if (this.socket.SendAsync(this.outArgs))
  286. {
  287. return;
  288. }
  289. HandleSend(this.outArgs);
  290. }
  291. catch (Exception e)
  292. {
  293. throw new Exception(
  294. $"socket set buffer error: {this.sendBuffer.First.Length}, {this.sendBuffer.FirstIndex}", e);
  295. }
  296. }
  297. }
  298. private void OnSendComplete(object o)
  299. {
  300. HandleSend(o);
  301. this.isSending = false;
  302. this.StartSend();
  303. }
  304. private void HandleSend(object o)
  305. {
  306. if (this.socket == null)
  307. {
  308. return;
  309. }
  310. SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
  311. if (e.SocketError != SocketError.Success)
  312. {
  313. this.OnError((int)e.SocketError);
  314. return;
  315. }
  316. if (e.BytesTransferred == 0)
  317. {
  318. this.OnError(ErrorCore.ERR_PeerDisconnect);
  319. return;
  320. }
  321. this.sendBuffer.FirstIndex += e.BytesTransferred;
  322. if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize)
  323. {
  324. this.sendBuffer.FirstIndex = 0;
  325. this.sendBuffer.RemoveFirst();
  326. }
  327. }
  328. private void OnRead(MemoryStream memoryStream)
  329. {
  330. try
  331. {
  332. long channelId = this.Id;
  333. this.Service.ReadCallback(channelId, memoryStream);
  334. }
  335. catch (Exception e)
  336. {
  337. Log.DetectionError($"{this.RemoteAddress} {memoryStream.Length} {e}");
  338. // 出现任何消息解析异常都要断开Session,防止客户端伪造消息
  339. this.OnError(ErrorCore.ERR_PacketParserError);
  340. }
  341. }
  342. private void OnError(int error)
  343. {
  344. Log.Info($"TChannel OnError: {error} {this.RemoteAddress}");
  345. long channelId = this.Id;
  346. this.Service.Remove(channelId);
  347. this.Service.ErrorCallback(channelId, error);
  348. }
  349. #endregion
  350. }
  351. }