ReliableChannel.cs 13 KB


  1. #if DEBUG && !UNITY_WP_8_1 && !UNITY_WSA_8_1
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. namespace FlyingWormConsole3.LiteNetLib
  6. {
  7. internal sealed class ReliableChannel
  8. {
  9. private class PendingPacket
  10. {
  11. public NetPacket Packet;
  12. public DateTime? TimeStamp;
  13. public NetPacket GetAndClear()
  14. {
  15. var packet = Packet;
  16. Packet = null;
  17. TimeStamp = null;
  18. return packet;
  19. }
  20. }
  21. private readonly Queue<NetPacket> _outgoingPackets;
  22. private readonly bool[] _outgoingAcks; //for send acks
  23. private readonly PendingPacket[] _pendingPackets; //for unacked packets and duplicates
  24. private readonly NetPacket[] _receivedPackets; //for order
  25. private readonly bool[] _earlyReceived; //for unordered
  26. private int _localSeqence;
  27. private int _remoteSequence;
  28. private int _localWindowStart;
  29. private int _remoteWindowStart;
  30. private readonly NetPeer _peer;
  31. private bool _mustSendAcks;
  32. private readonly bool _ordered;
  33. private readonly int _windowSize;
  34. private const int BitsInByte = 8;
  35. private int _queueIndex;
  36. public int PacketsInQueue
  37. {
  38. get { return _outgoingPackets.Count; }
  39. }
  40. public ReliableChannel(NetPeer peer, bool ordered, int windowSize)
  41. {
  42. _windowSize = windowSize;
  43. _peer = peer;
  44. _ordered = ordered;
  45. _outgoingPackets = new Queue<NetPacket>(_windowSize);
  46. _outgoingAcks = new bool[_windowSize];
  47. _pendingPackets = new PendingPacket[_windowSize];
  48. for (int i = 0; i < _pendingPackets.Length; i++)
  49. {
  50. _pendingPackets[i] = new PendingPacket();
  51. }
  52. if (_ordered)
  53. _receivedPackets = new NetPacket[_windowSize];
  54. else
  55. _earlyReceived = new bool[_windowSize];
  56. _localWindowStart = 0;
  57. _localSeqence = 0;
  58. _remoteSequence = 0;
  59. _remoteWindowStart = 0;
  60. }
  61. //ProcessAck in packet
  62. public void ProcessAck(NetPacket packet)
  63. {
  64. int validPacketSize = (_windowSize - 1) / BitsInByte + 1 + NetConstants.SequencedHeaderSize;
  65. if (packet.Size != validPacketSize)
  66. {
  67. NetUtils.DebugWrite("[PA]Invalid acks packet size");
  68. return;
  69. }
  70. ushort ackWindowStart = packet.Sequence;
  71. if (ackWindowStart > NetConstants.MaxSequence)
  72. {
  73. NetUtils.DebugWrite("[PA]Bad window start");
  74. return;
  75. }
  76. //check relevance
  77. if (NetUtils.RelativeSequenceNumber(ackWindowStart, _localWindowStart) <= -_windowSize)
  78. {
  79. NetUtils.DebugWrite("[PA]Old acks");
  80. return;
  81. }
  82. byte[] acksData = packet.RawData;
  83. NetUtils.DebugWrite("[PA]AcksStart: {0}", ackWindowStart);
  84. int startByte = NetConstants.SequencedHeaderSize;
  85. Monitor.Enter(_pendingPackets);
  86. for (int i = 0; i < _windowSize; i++)
  87. {
  88. int ackSequence = (ackWindowStart + i) % NetConstants.MaxSequence;
  89. if (NetUtils.RelativeSequenceNumber(ackSequence, _localWindowStart) < 0)
  90. {
  91. //NetUtils.DebugWrite(ConsoleColor.Cyan, "[PA] SKIP OLD: " + ackSequence);
  92. //Skip old ack
  93. continue;
  94. }
  95. int currentByte = startByte + i / BitsInByte;
  96. int currentBit = i % BitsInByte;
  97. if ((acksData[currentByte] & (1 << currentBit)) == 0)
  98. {
  99. //NetUtils.DebugWrite(ConsoleColor.Cyan, "[PA] SKIP FALSE: " + ackSequence);
  100. //Skip false ack
  101. continue;
  102. }
  103. if (ackSequence == _localWindowStart)
  104. {
  105. //Move window
  106. _localWindowStart = (_localWindowStart + 1) % NetConstants.MaxSequence;
  107. }
  108. NetPacket removed = _pendingPackets[ackSequence % _windowSize].GetAndClear();
  109. if (removed != null)
  110. {
  111. _peer.Recycle(removed);
  112. NetUtils.DebugWrite("[PA]Removing reliableInOrder ack: {0} - true", ackSequence);
  113. }
  114. else
  115. {
  116. NetUtils.DebugWrite("[PA]Removing reliableInOrder ack: {0} - false", ackSequence);
  117. }
  118. }
  119. Monitor.Exit(_pendingPackets);
  120. }
  121. public void AddToQueue(NetPacket packet)
  122. {
  123. lock (_outgoingPackets)
  124. {
  125. _outgoingPackets.Enqueue(packet);
  126. }
  127. }
  128. private void ProcessQueuedPackets()
  129. {
  130. //get packets from queue
  131. while (_outgoingPackets.Count > 0)
  132. {
  133. int relate = NetUtils.RelativeSequenceNumber(_localSeqence, _localWindowStart);
  134. if (relate < _windowSize)
  135. {
  136. NetPacket packet;
  137. lock (_outgoingPackets)
  138. {
  139. packet = _outgoingPackets.Dequeue();
  140. }
  141. packet.Sequence = (ushort)_localSeqence;
  142. _pendingPackets[_localSeqence % _windowSize].Packet = packet;
  143. _localSeqence = (_localSeqence + 1) % NetConstants.MaxSequence;
  144. }
  145. else //Queue filled
  146. {
  147. break;
  148. }
  149. }
  150. }
  151. public bool SendNextPacket()
  152. {
  153. //check sending acks
  154. DateTime currentTime = DateTime.UtcNow;
  155. Monitor.Enter(_pendingPackets);
  156. ProcessQueuedPackets();
  157. //send
  158. PendingPacket currentPacket;
  159. bool packetFound = false;
  160. int startQueueIndex = _queueIndex;
  161. do
  162. {
  163. currentPacket = _pendingPackets[_queueIndex];
  164. if (currentPacket.Packet != null)
  165. {
  166. //check send time
  167. if(currentPacket.TimeStamp.HasValue)
  168. {
  169. double packetHoldTime = (currentTime - currentPacket.TimeStamp.Value).TotalMilliseconds;
  170. if (packetHoldTime > _peer.ResendDelay)
  171. {
  172. NetUtils.DebugWrite("[RC]Resend: {0} > {1}", (int)packetHoldTime, _peer.ResendDelay);
  173. packetFound = true;
  174. }
  175. }
  176. else //Never sended
  177. {
  178. packetFound = true;
  179. }
  180. }
  181. _queueIndex = (_queueIndex + 1) % _windowSize;
  182. } while (!packetFound && _queueIndex != startQueueIndex);
  183. if (packetFound)
  184. {
  185. currentPacket.TimeStamp = DateTime.UtcNow;
  186. _peer.SendRawData(currentPacket.Packet);
  187. NetUtils.DebugWrite("[RR]Sended");
  188. }
  189. Monitor.Exit(_pendingPackets);
  190. return packetFound;
  191. }
  192. public void SendAcks()
  193. {
  194. if (!_mustSendAcks)
  195. return;
  196. _mustSendAcks = false;
  197. NetUtils.DebugWrite("[RR]SendAcks");
  198. //Init packet
  199. int bytesCount = (_windowSize - 1) / BitsInByte + 1;
  200. PacketProperty property = _ordered ? PacketProperty.AckReliableOrdered : PacketProperty.AckReliable;
  201. var acksPacket = _peer.GetPacketFromPool(property, bytesCount);
  202. //For quick access
  203. byte[] data = acksPacket.RawData; //window start + acks size
  204. //Put window start
  205. Monitor.Enter(_outgoingAcks);
  206. acksPacket.Sequence = (ushort)_remoteWindowStart;
  207. //Put acks
  208. int startAckIndex = _remoteWindowStart % _windowSize;
  209. int currentAckIndex = startAckIndex;
  210. int currentBit = 0;
  211. int currentByte = NetConstants.SequencedHeaderSize;
  212. do
  213. {
  214. if (_outgoingAcks[currentAckIndex])
  215. {
  216. data[currentByte] |= (byte)(1 << currentBit);
  217. }
  218. currentBit++;
  219. if (currentBit == BitsInByte)
  220. {
  221. currentByte++;
  222. currentBit = 0;
  223. }
  224. currentAckIndex = (currentAckIndex + 1) % _windowSize;
  225. } while (currentAckIndex != startAckIndex);
  226. Monitor.Exit(_outgoingAcks);
  227. _peer.SendRawData(acksPacket);
  228. _peer.Recycle(acksPacket);
  229. }
  230. //Process incoming packet
  231. public void ProcessPacket(NetPacket packet)
  232. {
  233. if (packet.Sequence >= NetConstants.MaxSequence)
  234. {
  235. NetUtils.DebugWrite("[RR]Bad sequence");
  236. return;
  237. }
  238. int relate = NetUtils.RelativeSequenceNumber(packet.Sequence, _remoteWindowStart);
  239. int relateSeq = NetUtils.RelativeSequenceNumber(packet.Sequence, _remoteSequence);
  240. if (relateSeq > _windowSize)
  241. {
  242. NetUtils.DebugWrite("[RR]Bad sequence");
  243. return;
  244. }
  245. //Drop bad packets
  246. if(relate < 0)
  247. {
  248. //Too old packet doesn't ack
  249. NetUtils.DebugWrite("[RR]ReliableInOrder too old");
  250. return;
  251. }
  252. if (relate >= _windowSize * 2)
  253. {
  254. //Some very new packet
  255. NetUtils.DebugWrite("[RR]ReliableInOrder too new");
  256. return;
  257. }
  258. //If very new - move window
  259. Monitor.Enter(_outgoingAcks);
  260. if (relate >= _windowSize)
  261. {
  262. //New window position
  263. int newWindowStart = (_remoteWindowStart + relate - _windowSize + 1) % NetConstants.MaxSequence;
  264. //Clean old data
  265. while (_remoteWindowStart != newWindowStart)
  266. {
  267. _outgoingAcks[_remoteWindowStart % _windowSize] = false;
  268. _remoteWindowStart = (_remoteWindowStart + 1) % NetConstants.MaxSequence;
  269. }
  270. }
  271. //Final stage - process valid packet
  272. //trigger acks send
  273. _mustSendAcks = true;
  274. if (_outgoingAcks[packet.Sequence % _windowSize])
  275. {
  276. NetUtils.DebugWrite("[RR]ReliableInOrder duplicate");
  277. Monitor.Exit(_outgoingAcks);
  278. return;
  279. }
  280. //save ack
  281. _outgoingAcks[packet.Sequence % _windowSize] = true;
  282. Monitor.Exit(_outgoingAcks);
  283. //detailed check
  284. if (packet.Sequence == _remoteSequence)
  285. {
  286. NetUtils.DebugWrite("[RR]ReliableInOrder packet succes");
  287. _peer.AddIncomingPacket(packet);
  288. _remoteSequence = (_remoteSequence + 1) % NetConstants.MaxSequence;
  289. if (_ordered)
  290. {
  291. NetPacket p;
  292. while ( (p = _receivedPackets[_remoteSequence % _windowSize]) != null)
  293. {
  294. //process holded packet
  295. _receivedPackets[_remoteSequence % _windowSize] = null;
  296. _peer.AddIncomingPacket(p);
  297. _remoteSequence = (_remoteSequence + 1) % NetConstants.MaxSequence;
  298. }
  299. }
  300. else
  301. {
  302. while (_earlyReceived[_remoteSequence % _windowSize])
  303. {
  304. //process early packet
  305. _earlyReceived[_remoteSequence % _windowSize] = false;
  306. _remoteSequence = (_remoteSequence + 1) % NetConstants.MaxSequence;
  307. }
  308. }
  309. return;
  310. }
  311. //holded packet
  312. if (_ordered)
  313. {
  314. _receivedPackets[packet.Sequence % _windowSize] = packet;
  315. }
  316. else
  317. {
  318. _earlyReceived[packet.Sequence % _windowSize] = true;
  319. _peer.AddIncomingPacket(packet);
  320. }
  321. }
  322. }
  323. }
  324. #endif