KChannel.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Runtime.InteropServices;
  7. namespace ET
  8. {
  9. public struct KcpWaitPacket
  10. {
  11. public long ActorId;
  12. public MemoryStream MemoryStream;
  13. }
  14. public class KChannel : AChannel
  15. {
  16. public static readonly Dictionary<IntPtr, KChannel> KcpPtrChannels = new Dictionary<IntPtr, KChannel>();
  17. public KService Service;
  18. private Socket socket;
  19. public IntPtr kcp { get; private set; }
  20. private readonly Queue<KcpWaitPacket> sendBuffer = new Queue<KcpWaitPacket>();
  21. private uint lastRecvTime;
  22. public readonly uint CreateTime;
  23. public uint LocalConn { get; set; }
  24. public uint RemoteConn { get; set; }
  25. private readonly byte[] sendCache = new byte[2 * 1024];
  26. public bool IsConnected { get; private set; }
  27. public string RealAddress { get; set; }
  28. private const int maxPacketSize = 10000;
  29. private MemoryStream ms = new MemoryStream(maxPacketSize);
  30. private MemoryStream readMemory;
  31. private int needReadSplitCount;
  32. private void InitKcp()
  33. {
  34. KcpPtrChannels.Add(this.kcp, this);
  35. switch (this.Service.ServiceType)
  36. {
  37. case ServiceType.Inner:
  38. Kcp.KcpNodelay(kcp, 1, 10, 2, 1);
  39. Kcp.KcpWndsize(kcp, ushort.MaxValue, ushort.MaxValue);
  40. Kcp.KcpSetmtu(kcp, 1400); // 默认1400
  41. Kcp.KcpSetminrto(kcp, 30);
  42. break;
  43. case ServiceType.Outer:
  44. Kcp.KcpNodelay(kcp, 1, 10, 2, 1);
  45. Kcp.KcpWndsize(kcp, 256, 256);
  46. Kcp.KcpSetmtu(kcp, 470);
  47. Kcp.KcpSetminrto(kcp, 30);
  48. break;
  49. }
  50. }
  51. // connect
  52. public KChannel(long id, uint localConn, Socket socket, IPEndPoint remoteEndPoint, KService kService)
  53. {
  54. this.LocalConn = localConn;
  55. this.Id = id;
  56. this.ChannelType = ChannelType.Connect;
  57. Log.Info($"channel create: {this.Id} {this.LocalConn} {remoteEndPoint} {this.ChannelType}");
  58. this.kcp = IntPtr.Zero;
  59. this.Service = kService;
  60. this.RemoteAddress = remoteEndPoint;
  61. this.socket = socket;
  62. this.lastRecvTime = kService.TimeNow;
  63. this.CreateTime = kService.TimeNow;
  64. this.Connect();
  65. }
  66. // accept
  67. public KChannel(long id, uint localConn, uint remoteConn, Socket socket, IPEndPoint remoteEndPoint, KService kService)
  68. {
  69. this.Id = id;
  70. this.ChannelType = ChannelType.Accept;
  71. Log.Info($"channel create: {this.Id} {localConn} {remoteConn} {remoteEndPoint} {this.ChannelType}");
  72. this.Service = kService;
  73. this.LocalConn = localConn;
  74. this.RemoteConn = remoteConn;
  75. this.RemoteAddress = remoteEndPoint;
  76. this.socket = socket;
  77. this.kcp = Kcp.KcpCreate(this.RemoteConn, IntPtr.Zero);
  78. this.InitKcp();
  79. this.lastRecvTime = kService.TimeNow;
  80. this.CreateTime = kService.TimeNow;
  81. }
  82. public override void Dispose()
  83. {
  84. if (this.IsDisposed)
  85. {
  86. return;
  87. }
  88. uint localConn = this.LocalConn;
  89. uint remoteConn = this.RemoteConn;
  90. Log.Info($"channel dispose: {this.Id} {localConn} {remoteConn}");
  91. long id = this.Id;
  92. this.Id = 0;
  93. this.Service.Remove(id);
  94. try
  95. {
  96. //this.Service.Disconnect(localConn, remoteConn, this.Error, this.RemoteAddress, 3);
  97. }
  98. catch (Exception e)
  99. {
  100. Log.Error(e);
  101. }
  102. if (this.kcp != IntPtr.Zero)
  103. {
  104. KcpPtrChannels.Remove(this.kcp);
  105. Kcp.KcpRelease(this.kcp);
  106. this.kcp = IntPtr.Zero;
  107. }
  108. this.socket = null;
  109. }
  110. public void HandleConnnect()
  111. {
  112. // 如果连接上了就不用处理了
  113. if (this.IsConnected)
  114. {
  115. return;
  116. }
  117. this.kcp = Kcp.KcpCreate(this.RemoteConn, IntPtr.Zero);
  118. this.InitKcp();
  119. Log.Info($"channel connected: {this.Id} {this.LocalConn} {this.RemoteConn} {this.RemoteAddress}");
  120. this.IsConnected = true;
  121. this.lastRecvTime = this.Service.TimeNow;
  122. while (true)
  123. {
  124. if (this.sendBuffer.Count <= 0)
  125. {
  126. break;
  127. }
  128. KcpWaitPacket buffer = this.sendBuffer.Dequeue();
  129. this.KcpSend(buffer);
  130. }
  131. }
  132. /// <summary>
  133. /// 发送请求连接消息
  134. /// </summary>
  135. private void Connect()
  136. {
  137. try
  138. {
  139. uint timeNow = this.Service.TimeNow;
  140. this.lastRecvTime = timeNow;
  141. byte[] buffer = sendCache;
  142. buffer.WriteTo(0, KcpProtocalType.SYN);
  143. buffer.WriteTo(1, this.LocalConn);
  144. buffer.WriteTo(5, this.RemoteConn);
  145. this.socket.SendTo(buffer, 0, 9, SocketFlags.None, this.RemoteAddress);
  146. Log.Info($"kchannel connect {this.Id} {this.LocalConn} {this.RemoteConn} {this.RealAddress} {this.socket.LocalEndPoint}");
  147. // 300毫秒后再次update发送connect请求
  148. this.Service.AddToUpdateNextTime(timeNow + 300, this.Id);
  149. }
  150. catch (Exception e)
  151. {
  152. Log.Error(e);
  153. this.OnError(ErrorCore.ERR_SocketCantSend);
  154. }
  155. }
  156. public void Update()
  157. {
  158. if (this.IsDisposed)
  159. {
  160. return;
  161. }
  162. uint timeNow = this.Service.TimeNow;
  163. // 如果还没连接上,发送连接请求
  164. if (!this.IsConnected)
  165. {
  166. // 10秒超时没连接上则报错
  167. if (timeNow - this.CreateTime > 10000)
  168. {
  169. Log.Error($"kChannel connect timeout: {this.Id} {this.RemoteConn} {timeNow} {this.CreateTime} {this.ChannelType} {this.RemoteAddress}");
  170. this.OnError(ErrorCore.ERR_KcpConnectTimeout);
  171. return;
  172. }
  173. switch (ChannelType)
  174. {
  175. case ChannelType.Connect:
  176. this.Connect();
  177. break;
  178. }
  179. return;
  180. }
  181. if (this.kcp == IntPtr.Zero)
  182. {
  183. return;
  184. }
  185. try
  186. {
  187. Kcp.KcpUpdate(this.kcp, timeNow);
  188. }
  189. catch (Exception e)
  190. {
  191. Log.Error(e);
  192. this.OnError(ErrorCore.ERR_SocketError);
  193. return;
  194. }
  195. uint nextUpdateTime = Kcp.KcpCheck(this.kcp, timeNow);
  196. this.Service.AddToUpdateNextTime(nextUpdateTime, this.Id);
  197. }
  198. public void HandleRecv(byte[] date, int offset, int length)
  199. {
  200. if (this.IsDisposed)
  201. {
  202. return;
  203. }
  204. this.IsConnected = true;
  205. Kcp.KcpInput(this.kcp, date, offset, length);
  206. this.Service.AddToUpdateNextTime(0, this.Id);
  207. while (true)
  208. {
  209. if (this.IsDisposed)
  210. {
  211. break;
  212. }
  213. int n = Kcp.KcpPeeksize(this.kcp);
  214. if (n < 0)
  215. {
  216. break;
  217. }
  218. if (n == 0)
  219. {
  220. this.OnError((int)SocketError.NetworkReset);
  221. return;
  222. }
  223. if (this.needReadSplitCount > 0) // 说明消息分片了
  224. {
  225. byte[] buffer = readMemory.GetBuffer();
  226. int count = Kcp.KcpRecv(this.kcp, buffer, (int)this.readMemory.Length - this.needReadSplitCount, n);
  227. this.needReadSplitCount -= count;
  228. if (n != count)
  229. {
  230. Log.Error($"kchannel read error1: {this.LocalConn} {this.RemoteConn}");
  231. this.OnError(ErrorCore.ERR_KcpReadNotSame);
  232. return;
  233. }
  234. if (this.needReadSplitCount < 0)
  235. {
  236. Log.Error($"kchannel read error2: {this.LocalConn} {this.RemoteConn}");
  237. this.OnError(ErrorCore.ERR_KcpSplitError);
  238. return;
  239. }
  240. // 没有读完
  241. if (this.needReadSplitCount != 0)
  242. {
  243. continue;
  244. }
  245. }
  246. else
  247. {
  248. this.readMemory = this.ms;
  249. this.readMemory.SetLength(n);
  250. this.readMemory.Seek(0, SeekOrigin.Begin);
  251. byte[] buffer = readMemory.GetBuffer();
  252. int count = Kcp.KcpRecv(this.kcp, buffer, 0, n);
  253. if (n != count)
  254. {
  255. break;
  256. }
  257. // 判断是不是分片
  258. if (n == 8)
  259. {
  260. int headInt = BitConverter.ToInt32(this.readMemory.GetBuffer(), 0);
  261. if (headInt == 0)
  262. {
  263. this.needReadSplitCount = BitConverter.ToInt32(readMemory.GetBuffer(), 4);
  264. if (this.needReadSplitCount <= maxPacketSize)
  265. {
  266. Log.Error($"kchannel read error3: {this.needReadSplitCount} {this.LocalConn} {this.RemoteConn}");
  267. this.OnError(ErrorCore.ERR_KcpSplitCountError);
  268. return;
  269. }
  270. this.readMemory = new MemoryStream(this.needReadSplitCount);
  271. this.readMemory.SetLength(this.needReadSplitCount);
  272. this.readMemory.Seek(0, SeekOrigin.Begin);
  273. continue;
  274. }
  275. }
  276. }
  277. switch (this.Service.ServiceType)
  278. {
  279. case ServiceType.Inner:
  280. this.readMemory.Seek(Packet.ActorIdLength + Packet.OpcodeLength, SeekOrigin.Begin);
  281. break;
  282. case ServiceType.Outer:
  283. this.readMemory.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
  284. break;
  285. }
  286. this.lastRecvTime = this.Service.TimeNow;
  287. MemoryStream mem = this.readMemory;
  288. this.readMemory = null;
  289. this.OnRead(mem);
  290. }
  291. }
  292. public void Output(IntPtr bytes, int count)
  293. {
  294. if (this.IsDisposed)
  295. {
  296. return;
  297. }
  298. try
  299. {
  300. // 没连接上 kcp不往外发消息, 其实本来没连接上不会调用update,这里只是做一层保护
  301. if (!this.IsConnected)
  302. {
  303. return;
  304. }
  305. if (count == 0)
  306. {
  307. Log.Error($"output 0");
  308. return;
  309. }
  310. byte[] buffer = this.sendCache;
  311. buffer.WriteTo(0, KcpProtocalType.MSG);
  312. // 每个消息头部写下该channel的id;
  313. buffer.WriteTo(1, this.LocalConn);
  314. Marshal.Copy(bytes, buffer, 5, count);
  315. this.socket.SendTo(buffer, 0, count + 5, SocketFlags.None, this.RemoteAddress);
  316. }
  317. catch (Exception e)
  318. {
  319. Log.Error(e);
  320. this.OnError(ErrorCore.ERR_SocketCantSend);
  321. }
  322. }
  323. private void KcpSend(KcpWaitPacket kcpWaitPacket)
  324. {
  325. if (this.IsDisposed)
  326. {
  327. return;
  328. }
  329. MemoryStream memoryStream = kcpWaitPacket.MemoryStream;
  330. int count = (int) (memoryStream.Length - memoryStream.Position);
  331. if (this.Service.ServiceType == ServiceType.Inner)
  332. {
  333. memoryStream.GetBuffer().WriteTo(0, kcpWaitPacket.ActorId);
  334. }
  335. // 超出maxPacketSize需要分片
  336. if (count <= maxPacketSize)
  337. {
  338. Kcp.KcpSend(this.kcp, memoryStream.GetBuffer(), (int)memoryStream.Position, count);
  339. }
  340. else
  341. {
  342. // 先发分片信息
  343. this.sendCache.WriteTo(0, 0);
  344. this.sendCache.WriteTo(4, count);
  345. Kcp.KcpSend(this.kcp, this.sendCache, 0, 8);
  346. // 分片发送
  347. int alreadySendCount = 0;
  348. while (alreadySendCount < count)
  349. {
  350. int leftCount = count - alreadySendCount;
  351. int sendCount = leftCount < maxPacketSize? leftCount: maxPacketSize;
  352. Kcp.KcpSend(this.kcp, memoryStream.GetBuffer(), (int)memoryStream.Position + alreadySendCount, sendCount);
  353. alreadySendCount += sendCount;
  354. }
  355. }
  356. this.Service.AddToUpdateNextTime(0, this.Id);
  357. }
  358. public void Send(long actorId, MemoryStream stream)
  359. {
  360. if (this.kcp != IntPtr.Zero)
  361. {
  362. // 检查等待发送的消息,如果超出最大等待大小,应该断开连接
  363. int n = Kcp.KcpWaitsnd(this.kcp);
  364. int maxWaitSize = 0;
  365. switch (this.Service.ServiceType)
  366. {
  367. case ServiceType.Inner:
  368. maxWaitSize = Kcp.InnerMaxWaitSize;
  369. break;
  370. case ServiceType.Outer:
  371. maxWaitSize = Kcp.OuterMaxWaitSize;
  372. break;
  373. default:
  374. throw new ArgumentOutOfRangeException();
  375. }
  376. if (n > maxWaitSize)
  377. {
  378. Log.Error($"kcp wait snd too large: {n}: {this.Id} {this.LocalConn} {this.RemoteConn}");
  379. this.OnError(ErrorCore.ERR_KcpWaitSendSizeTooLarge);
  380. return;
  381. }
  382. }
  383. KcpWaitPacket kcpWaitPacket = new KcpWaitPacket() { ActorId = actorId, MemoryStream = stream };
  384. if (!this.IsConnected)
  385. {
  386. this.sendBuffer.Enqueue(kcpWaitPacket);
  387. return;
  388. }
  389. this.KcpSend(kcpWaitPacket);
  390. }
  391. private void OnRead(MemoryStream memoryStream)
  392. {
  393. this.Service.OnRead(this.Id, memoryStream);
  394. }
  395. public void OnError(int error)
  396. {
  397. long channelId = this.Id;
  398. this.Service.Remove(channelId);
  399. this.Service.OnError(channelId, error);
  400. }
  401. }
  402. }