Kcp.cs 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244
  1. // Kcp based on https://github.com/skywind3000/kcp
  2. // Kept as close to original as possible.
  3. using System;
  4. using System.Buffers;
  5. using System.Collections.Generic;
  6. using System.Diagnostics.CodeAnalysis;
  7. using System.Runtime.CompilerServices;
  8. using System.Runtime.InteropServices;
  9. namespace ET
  10. {
  11. public partial class Kcp
  12. {
  13. // original Kcp has a define option, which is not defined by default:
  14. // #define FASTACK_CONSERVE
  15. public const int RTO_NDL = 30; // no delay min rto
  16. public const int RTO_MIN = 100; // normal min rto
  17. public const int RTO_DEF = 200; // default RTO
  18. public const int RTO_MAX = 60000; // maximum RTO
  19. public const int CMD_PUSH = 81; // cmd: push data
  20. public const int CMD_ACK = 82; // cmd: ack
  21. public const int CMD_WASK = 83; // cmd: window probe (ask)
  22. public const int CMD_WINS = 84; // cmd: window size (tell/insert)
  23. public const int ASK_SEND = 1; // need to send CMD_WASK
  24. public const int ASK_TELL = 2; // need to send CMD_WINS
  25. public const int WND_SND = 32; // default send window
  26. public const int WND_RCV = 128; // default receive window. must be >= max fragment size
  27. public const int MTU_DEF = 1200; // default MTU (reduced to 1200 to fit all cases: https://en.wikipedia.org/wiki/Maximum_transmission_unit ; steam uses 1200 too!)
  28. public const int ACK_FAST = 3;
  29. public const int INTERVAL = 100;
  30. public const int OVERHEAD = 24;
  31. public const int FRG_MAX = byte.MaxValue; // kcp encodes 'frg' as byte. so we can only ever send up to 255 fragments.
  32. public const int DEADLINK = 20; // default maximum amount of 'xmit' retransmissions until a segment is considered lost
  33. public const int THRESH_INIT = 2;
  34. public const int THRESH_MIN = 2;
  35. public const int PROBE_INIT = 7000; // 7 secs to probe window size
  36. public const int PROBE_LIMIT = 120000; // up to 120 secs to probe window
  37. public const int FASTACK_LIMIT = 5; // max times to trigger fastack
  38. public const int RESERVED_BYTE = 5; // 包头预留字节数 供et网络层使用
  39. // kcp members.
  40. internal int state;
  41. readonly uint conv; // conversation
  42. internal uint mtu;
  43. internal uint mss; // maximum segment size := MTU - OVERHEAD
  44. internal uint snd_una; // unacknowledged. e.g. snd_una is 9 it means 8 has been confirmed, 9 and 10 have been sent
  45. internal uint snd_nxt; // forever growing send counter for sequence numbers
  46. internal uint rcv_nxt; // forever growing receive counter for sequence numbers
  47. internal uint ssthresh; // slow start threshold
  48. internal int rx_rttval; // average deviation of rtt, used to measure the jitter of rtt
  49. internal int rx_srtt; // smoothed round trip time (a weighted average of rtt)
  50. internal int rx_rto;
  51. internal int rx_minrto;
  52. internal uint snd_wnd; // send window
  53. internal uint rcv_wnd; // receive window
  54. internal uint rmt_wnd; // remote window
  55. internal uint cwnd; // congestion window
  56. internal uint probe;
  57. internal uint interval;
  58. internal uint ts_flush; // last flush timestamp in milliseconds
  59. internal uint xmit;
  60. internal uint nodelay; // not a bool. original Kcp has '<2 else' check.
  61. internal bool updated;
  62. internal uint ts_probe; // probe timestamp
  63. internal uint probe_wait;
  64. internal uint dead_link; // maximum amount of 'xmit' retransmissions until a segment is considered lost
  65. internal uint incr;
  66. internal uint current; // current time (milliseconds). set by Update.
  67. internal int fastresend;
  68. internal int fastlimit;
  69. internal bool nocwnd; // congestion control, negated. heavily restricts send/recv window sizes.
  70. internal readonly Queue<SegmentStruct> snd_queue = new Queue<SegmentStruct>(16); // send queue
  71. internal readonly Queue<SegmentStruct> rcv_queue = new Queue<SegmentStruct>(16); // receive queue
  72. // snd_buffer needs index removals.
  73. // C# LinkedList allocates for each entry, so let's keep List for now.
  74. internal readonly List<SegmentStruct> snd_buf = new List<SegmentStruct>(16); // send buffer
  75. // rcv_buffer needs index insertions and backwards iteration.
  76. // C# LinkedList allocates for each entry, so let's keep List for now.
  77. internal readonly List<SegmentStruct> rcv_buf = new List<SegmentStruct>(16); // receive buffer
  78. internal readonly List<AckItem> acklist = new List<AckItem>(16);
  79. private ArrayPool<byte> kcpSegmentArrayPool;
  80. // memory buffer
  81. // size depends on MTU.
  82. // MTU can be changed at runtime, which resizes the buffer.
  83. internal byte[] buffer;
  84. // output function of type <buffer, size>
  85. readonly Action<byte[], int> output;
  86. // segment pool to avoid allocations in C#.
  87. // this is not part of the original C code.
  88. // readonly Pool<Segment> SegmentPool = new Pool<Segment>(
  89. // // create new segment
  90. // () => new Segment(),
  91. // // reset segment before reuse
  92. // (segment) => segment.Reset(),
  93. // // initial capacity
  94. // 32
  95. // );
  96. // ikcp_create
  97. // create a new kcp control object, 'conv' must equal in two endpoint
  98. // from the same connection.
  99. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  100. public Kcp(uint conv, Action<byte[], int> output)
  101. {
  102. this.conv = conv;
  103. this.output = output;
  104. snd_wnd = WND_SND;
  105. rcv_wnd = WND_RCV;
  106. rmt_wnd = WND_RCV;
  107. mtu = MTU_DEF;
  108. mss = mtu - OVERHEAD;
  109. rx_rto = RTO_DEF;
  110. rx_minrto = RTO_MIN;
  111. interval = INTERVAL;
  112. ts_flush = INTERVAL;
  113. ssthresh = THRESH_INIT;
  114. fastlimit = FASTACK_LIMIT;
  115. dead_link = DEADLINK;
  116. buffer = new byte[(mtu + OVERHEAD) * 3];
  117. }
  118. // ikcp_segment_new
  119. // we keep the original function and add our pooling to it.
  120. // this way we'll never miss it anywhere.
  121. // [MethodImpl(MethodImplOptions.AggressiveInlining)]
  122. // Segment SegmentNew() => SegmentPool.Take();
  123. // ikcp_segment_delete
  124. // we keep the original function and add our pooling to it.
  125. // this way we'll never miss it anywhere.
  126. // [MethodImpl(MethodImplOptions.AggressiveInlining)]
  127. // void SegmentDelete(Segment seg) => SegmentPool.Return(seg);
  128. // calculate how many packets are waiting to be sent
  129. public int WaitSnd => snd_buf.Count + snd_queue.Count;
  130. // ikcp_wnd_unused
  131. // returns the remaining space in receive window (rcv_wnd - rcv_queue)
  132. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  133. internal ushort WndUnused()
  134. {
  135. if (rcv_queue.Count < rcv_wnd)
  136. return (ushort)(rcv_wnd - (uint)rcv_queue.Count);
  137. return 0;
  138. }
  139. public int Receive(Span<byte> data)
  140. {
  141. // kcp's ispeek feature is not supported.
  142. // this makes 'merge fragment' code significantly easier because
  143. // we can iterate while queue.Count > 0 and dequeue each time.
  144. // if we had to consider ispeek then count would always be > 0 and
  145. // we would have to remove only after the loop.
  146. //
  147. int len = data.Length;
  148. if (rcv_queue.Count == 0)
  149. return -1;
  150. int peeksize = PeekSize();
  151. if (peeksize < 0)
  152. return -2;
  153. if (peeksize > len)
  154. return -3;
  155. bool recover = rcv_queue.Count >= rcv_wnd;
  156. // merge fragment.
  157. len = 0;
  158. ref byte dest = ref MemoryMarshal.GetReference(data);
  159. // original KCP iterates rcv_queue and deletes if !ispeek.
  160. // removing from a c# queue while iterating is not possible, but
  161. // we can change to 'while Count > 0' and remove every time.
  162. // (we can remove every time because we removed ispeek support!)
  163. while (rcv_queue.Count > 0)
  164. {
  165. // unlike original kcp, we dequeue instead of just getting the
  166. // entry. this is fine because we remove it in ANY case.
  167. SegmentStruct seg = rcv_queue.Dequeue();
  168. // copy segment data into our buffer
  169. ref byte source = ref MemoryMarshal.GetReference(seg.WrittenBuffer);
  170. Unsafe.CopyBlockUnaligned(ref dest,ref source,(uint)seg.WrittenCount);
  171. dest = ref Unsafe.Add(ref dest, (uint) seg.WrittenCount);
  172. len += seg.WrittenCount;
  173. uint fragment = seg.SegHead.frg;
  174. // note: ispeek is not supported in order to simplify this loop
  175. // unlike original kcp, we don't need to remove seg from queue
  176. // because we already dequeued it.
  177. // simply delete it
  178. seg.Dispose();
  179. if (fragment == 0)
  180. break;
  181. }
  182. // move available data from rcv_buf -> rcv_queue
  183. int removed = 0;
  184. #if NET7_0_OR_GREATER
  185. foreach (ref SegmentStruct seg in CollectionsMarshal.AsSpan(this.rcv_buf))
  186. #else
  187. foreach (SegmentStruct seg in rcv_buf)
  188. #endif
  189. {
  190. if (seg.SegHead.sn == rcv_nxt && rcv_queue.Count < rcv_wnd)
  191. {
  192. // can't remove while iterating. remember how many to remove
  193. // and do it after the loop.
  194. // note: don't return segment. we only add it to rcv_queue
  195. ++removed;
  196. // add
  197. rcv_queue.Enqueue(seg);
  198. // increase sequence number for next segment
  199. rcv_nxt++;
  200. }
  201. else
  202. {
  203. break;
  204. }
  205. }
  206. rcv_buf.RemoveRange(0, removed);
  207. // fast recover
  208. if (rcv_queue.Count < rcv_wnd && recover)
  209. {
  210. // ready to send back CMD_WINS in flush
  211. // tell remote my window size
  212. probe |= ASK_TELL;
  213. }
  214. return len;
  215. }
  216. // ikcp_peeksize
  217. // check the size of next message in the recv queue.
  218. // returns -1 if there is no message, or if the message is still incomplete.
  219. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  220. public int PeekSize()
  221. {
  222. int length = 0;
  223. // empty queue?
  224. if (rcv_queue.Count == 0) return -1;
  225. // peek the first segment
  226. SegmentStruct seq = rcv_queue.Peek();
  227. // seg.frg is 0 if the message requires no fragmentation.
  228. // in that case, the segment's size is the final message size.
  229. if (seq.SegHead.frg == 0) return seq.WrittenCount;
  230. // check if all fragment parts were received yet.
  231. // seg.frg is the n-th fragment, but in reverse.
  232. // this way the first received segment tells us how many fragments there are for the message.
  233. // for example, if a message contains 3 segments:
  234. // first segment: .frg is 2 (index in reverse)
  235. // second segment: .frg is 1 (index in reverse)
  236. // third segment: .frg is 0 (index in reverse)
  237. if (rcv_queue.Count < seq.SegHead.frg + 1) return -1;
  238. // recv_queue contains all the fragments necessary to reconstruct the message.
  239. // sum all fragment's sizes to get the full message size.
  240. foreach (SegmentStruct seg in rcv_queue)
  241. {
  242. length += seg.WrittenCount;
  243. if (seg.SegHead.frg == 0) break;
  244. }
  245. return length;
  246. }
  247. // ikcp_send
  248. // splits message into MTU sized fragments, adds them to snd_queue.
  249. public int Send(ReadOnlySpan<byte> data)
  250. {
  251. // fragment count
  252. int count;
  253. int len = data.Length;
  254. int offset = 0;
  255. // streaming mode: removed. we never want to send 'hello' and
  256. // receive 'he' 'll' 'o'. we want to always receive 'hello'.
  257. // calculate amount of fragments necessary for 'len'
  258. if (len <= mss) count = 1;
  259. else count = (int)((len + mss - 1) / mss);
  260. // IMPORTANT kcp encodes 'frg' as 1 byte.
  261. // so we can only support up to 255 fragments.
  262. // (which limits max message size to around 288 KB)
  263. // this is difficult to debug. let's make this 100% obvious.
  264. if (count > FRG_MAX)
  265. ThrowFrgCountException(len,count);
  266. // original kcp uses WND_RCV const instead of rcv_wnd runtime:
  267. // https://github.com/skywind3000/kcp/pull/291/files
  268. // which always limits max message size to 144 KB:
  269. //if (count >= WND_RCV) return -2;
  270. // using configured rcv_wnd uncorks max message size to 'any':
  271. if (count >= rcv_wnd) return -2;
  272. if (count == 0) count = 1;
  273. ref byte dataRef = ref MemoryMarshal.GetReference(data);
  274. // fragment
  275. for (int i = 0; i < count; i++)
  276. {
  277. int size = len > (int)mss ? (int)mss : len;
  278. SegmentStruct seg = new SegmentStruct(size,this.kcpSegmentArrayPool);
  279. if (len > 0)
  280. {
  281. Unsafe.CopyBlockUnaligned(ref MemoryMarshal.GetReference(seg.FreeBuffer),ref dataRef,(uint)size);
  282. dataRef = ref Unsafe.Add(ref dataRef, size);
  283. seg.Advance(size);
  284. }
  285. seg.SegHead.frg = (byte)(count - i - 1);
  286. snd_queue.Enqueue(seg);
  287. offset += size;
  288. len -= size;
  289. }
  290. return 0;
  291. }
  292. // ikcp_update_ack
  293. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  294. void UpdateAck(int rtt) // round trip time
  295. {
  296. // https://tools.ietf.org/html/rfc6298
  297. if (rx_srtt == 0)
  298. {
  299. rx_srtt = rtt;
  300. rx_rttval = rtt / 2;
  301. }
  302. else
  303. {
  304. int delta = rtt - rx_srtt;
  305. if (delta < 0) delta = -delta;
  306. rx_rttval = (3 * rx_rttval + delta) / 4;
  307. rx_srtt = (7 * rx_srtt + rtt) / 8;
  308. if (rx_srtt < 1) rx_srtt = 1;
  309. }
  310. int rto = rx_srtt + Math.Max((int)interval, 4 * rx_rttval);
  311. rx_rto = Utils.Clamp(rto, rx_minrto, RTO_MAX);
  312. }
  313. // ikcp_shrink_buf
  314. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  315. internal void ShrinkBuf()
  316. {
  317. if (snd_buf.Count > 0)
  318. {
  319. SegmentStruct seg = snd_buf[0];
  320. snd_una = seg.SegHead.sn;
  321. }
  322. else
  323. {
  324. snd_una = snd_nxt;
  325. }
  326. }
  327. // ikcp_parse_ack
  328. // removes the segment with 'sn' from send buffer
  329. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  330. internal void ParseAck(uint sn)
  331. {
  332. if (Utils.TimeDiff(sn, snd_una) < 0 || Utils.TimeDiff(sn, snd_nxt) >= 0)
  333. return;
  334. // for-int so we can erase while iterating
  335. bool needRemove = false;
  336. int removeIndex = 0;
  337. #if NET7_0_OR_GREATER
  338. foreach (ref var seg in CollectionsMarshal.AsSpan(snd_buf))
  339. #else
  340. foreach (var seg in snd_buf)
  341. #endif
  342. {
  343. // is this the segment?
  344. if (sn == seg.SegHead.sn)
  345. {
  346. // remove and return
  347. needRemove = true;
  348. // SegmentDelete(seg);
  349. seg.Dispose();
  350. break;
  351. }
  352. if (Utils.TimeDiff(sn, seg.SegHead.sn) < 0)
  353. {
  354. break;
  355. }
  356. removeIndex++;
  357. }
  358. if (needRemove)
  359. {
  360. snd_buf.RemoveAt(removeIndex);
  361. }
  362. }
  363. // ikcp_parse_una
  364. // removes all unacknowledged segments with sequence numbers < una from send buffer
  365. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  366. internal void ParseUna(uint una)
  367. {
  368. int removed = 0;
  369. #if NET7_0_OR_GREATER
  370. foreach (ref SegmentStruct seg in CollectionsMarshal.AsSpan(snd_buf))
  371. #else
  372. foreach (SegmentStruct seg in snd_buf)
  373. #endif
  374. {
  375. // if (Utils.TimeDiff(una, seg.sn) > 0)
  376. if (seg.SegHead.sn < una)
  377. {
  378. // can't remove while iterating. remember how many to remove
  379. // and do it after the loop.
  380. ++removed;
  381. // SegmentDelete(seg);
  382. seg.Dispose();
  383. }
  384. else
  385. {
  386. break;
  387. }
  388. }
  389. snd_buf.RemoveRange(0, removed);
  390. }
  391. // ikcp_parse_fastack
  392. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  393. internal void ParseFastack(uint sn, uint ts) // serial number, timestamp
  394. {
  395. // sn needs to be between snd_una and snd_nxt
  396. // if !(snd_una <= sn && sn < snd_nxt) return;
  397. // if (Utils.TimeDiff(sn, snd_una) < 0)
  398. if (sn < snd_una)
  399. return;
  400. // if (Utils.TimeDiff(sn, snd_nxt) >= 0)
  401. if (sn >= snd_nxt)
  402. return;
  403. #if NET7_0_OR_GREATER
  404. foreach (ref var seg in CollectionsMarshal.AsSpan(snd_buf))
  405. {
  406. // if (Utils.TimeDiff(sn, seg.sn) < 0)
  407. if (sn < seg.SegHead.sn)
  408. {
  409. break;
  410. }
  411. else if (sn != seg.SegHead.sn)
  412. {
  413. #if !FASTACK_CONSERVE
  414. seg.fastack++;
  415. #else
  416. if (Utils.TimeDiff(ts, seg.SegHead.ts) >= 0)
  417. {
  418. seg.fastack++;
  419. }
  420. #endif
  421. }
  422. }
  423. #else
  424. for (int i = 0; i < this.snd_buf.Count; i++)
  425. {
  426. SegmentStruct seg = this.snd_buf[i];
  427. // if (Utils.TimeDiff(sn, seg.sn) < 0)
  428. if (sn < seg.SegHead.sn)
  429. {
  430. break;
  431. }
  432. else if (sn != seg.SegHead.sn)
  433. {
  434. #if !FASTACK_CONSERVE
  435. seg.fastack++;
  436. this.snd_buf[i] = seg;
  437. #else
  438. if (Utils.TimeDiff(ts, seg.SegHead.ts) >= 0)
  439. {
  440. seg.fastack++;
  441. this.snd_buf[i] = seg;
  442. }
  443. #endif
  444. }
  445. }
  446. #endif
  447. }
  448. // ikcp_ack_push
  449. // appends an ack.
  450. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  451. void AckPush(uint sn, uint ts) // serial number, timestamp
  452. {
  453. acklist.Add(new AckItem{ serialNumber = sn, timestamp = ts });
  454. }
  455. // ikcp_parse_data
  456. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  457. void ParseData(ref SegmentStruct newseg)
  458. {
  459. uint sn = newseg.SegHead.sn;
  460. if (Utils.TimeDiff(sn, rcv_nxt + rcv_wnd) >= 0 ||
  461. Utils.TimeDiff(sn, rcv_nxt) < 0)
  462. {
  463. newseg.Dispose();
  464. return;
  465. }
  466. InsertSegmentInReceiveBuffer(ref newseg);
  467. MoveReceiveBufferReadySegmentsToQueue();
  468. }
  469. // inserts the segment into rcv_buf, ordered by seg.sn.
  470. // drops the segment if one with the same seg.sn already exists.
  471. // goes through receive buffer in reverse order for performance.
  472. //
  473. // note: see KcpTests.InsertSegmentInReceiveBuffer test!
  474. // note: 'insert or delete' can be done in different ways, but let's
  475. // keep consistency with original C kcp.
  476. internal void InsertSegmentInReceiveBuffer(ref SegmentStruct newseg)
  477. {
  478. bool repeat = false; // 'duplicate'
  479. // original C iterates backwards, so we need to do that as well.
  480. // note if rcv_buf.Count == 0, i becomes -1 and no looping happens.
  481. int i;
  482. #if NET7_0_OR_GREATER
  483. Span<SegmentStruct> arr = CollectionsMarshal.AsSpan(rcv_buf);
  484. for (i = arr.Length-1; i>=0 ; i--)
  485. {
  486. ref SegmentStruct seg =ref arr[i];
  487. #else
  488. for (i = rcv_buf.Count - 1; i >= 0; i--)
  489. {
  490. SegmentStruct seg = rcv_buf[i];
  491. #endif
  492. if (seg.SegHead.sn == newseg.SegHead.sn)
  493. {
  494. // duplicate segment found. nothing will be added.
  495. repeat = true;
  496. break;
  497. }
  498. if (Utils.TimeDiff(newseg.SegHead.sn, seg.SegHead.sn) > 0)
  499. {
  500. // this entry's sn is < newseg.sn, so let's stop
  501. break;
  502. }
  503. }
  504. // no duplicate? then insert.
  505. if (!repeat)
  506. {
  507. rcv_buf.Insert(i + 1, newseg);
  508. }
  509. // duplicate. just delete it.
  510. else
  511. {
  512. newseg.Dispose();
  513. }
  514. }
  515. // move ready segments from rcv_buf -> rcv_queue.
  516. // moves only the ready segments which are in rcv_nxt sequence order.
  517. // some may still be missing an inserted later.
  518. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  519. void MoveReceiveBufferReadySegmentsToQueue()
  520. {
  521. int removed = 0;
  522. #if NET7_0_OR_GREATER
  523. foreach (ref var seg in CollectionsMarshal.AsSpan(rcv_buf))
  524. #else
  525. foreach (var seg in rcv_buf)
  526. #endif
  527. {
  528. // move segments while they are in 'rcv_nxt' sequence order.
  529. // some may still be missing and inserted later, in this case it stops immediately
  530. // because segments always need to be received in the exact sequence order.
  531. if (seg.SegHead.sn == rcv_nxt && rcv_queue.Count < rcv_wnd)
  532. {
  533. // can't remove while iterating. remember how many to remove
  534. // and do it after the loop.
  535. ++removed;
  536. rcv_queue.Enqueue(seg);
  537. // increase sequence number for next segment
  538. rcv_nxt++;
  539. }
  540. else
  541. {
  542. break;
  543. }
  544. }
  545. rcv_buf.RemoveRange(0, removed);
  546. }
  547. // ikcp_input
  548. // used when you receive a low level packet (e.g. UDP packet)
  549. // => original kcp uses offset=0, we made it a parameter so that high
  550. // level can skip the channel byte more easily
  551. public int Input(Span<byte> data)
  552. {
  553. int offset = 0;
  554. int size = data.Length;
  555. uint prev_una = snd_una;
  556. uint maxack = 0;
  557. uint latest_ts = 0;
  558. int flag = 0;
  559. if (data == null || size < OVERHEAD) return -1;
  560. while (true)
  561. {
  562. // enough data left to decode segment (aka OVERHEAD bytes)?
  563. if (size < OVERHEAD) break;
  564. var segHead = Unsafe.ReadUnaligned<SegmentHead>(ref MemoryMarshal.GetReference(data.Slice(offset)));
  565. offset += Unsafe.SizeOf<SegmentHead>();
  566. uint conv_ = segHead.conv;
  567. byte cmd = segHead.cmd;
  568. byte frg = segHead.frg;
  569. ushort wnd = segHead.wnd;
  570. uint ts = segHead.ts;
  571. uint sn = segHead.sn;
  572. uint una = segHead.una;
  573. uint len = segHead.len;
  574. // reduce remaining size by what was read
  575. size -= OVERHEAD;
  576. // enough remaining to read 'len' bytes of the actual payload?
  577. // note: original kcp casts uint len to int for <0 check.
  578. if (size < len || (int)len < 0) return -2;
  579. // validate command type
  580. if (cmd != CMD_PUSH && cmd != CMD_ACK &&
  581. cmd != CMD_WASK && cmd != CMD_WINS)
  582. return -3;
  583. rmt_wnd = wnd;
  584. ParseUna(una);
  585. ShrinkBuf();
  586. if (cmd == CMD_ACK)
  587. {
  588. if (Utils.TimeDiff(current, ts) >= 0)
  589. {
  590. UpdateAck(Utils.TimeDiff(current, ts));
  591. }
  592. ParseAck(sn);
  593. ShrinkBuf();
  594. if (flag == 0)
  595. {
  596. flag = 1;
  597. maxack = sn;
  598. latest_ts = ts;
  599. }
  600. else
  601. {
  602. if (Utils.TimeDiff(sn, maxack) > 0)
  603. {
  604. #if !FASTACK_CONSERVE
  605. maxack = sn;
  606. latest_ts = ts;
  607. #else
  608. if (Utils.TimeDiff(ts, latest_ts) > 0)
  609. {
  610. maxack = sn;
  611. latest_ts = ts;
  612. }
  613. #endif
  614. }
  615. }
  616. }
  617. else if (cmd == CMD_PUSH)
  618. {
  619. if (Utils.TimeDiff(sn, rcv_nxt + rcv_wnd) < 0)
  620. {
  621. AckPush(sn, ts);
  622. if (Utils.TimeDiff(sn, rcv_nxt) >= 0)
  623. {
  624. SegmentStruct seg = new SegmentStruct((int) len,this.kcpSegmentArrayPool);
  625. seg.SegHead = new SegmentHead()
  626. {
  627. conv = conv_,
  628. cmd = cmd,
  629. frg = frg,
  630. wnd = wnd,
  631. ts = ts,
  632. sn = sn,
  633. una = una,
  634. };
  635. if (len > 0)
  636. {
  637. data.Slice(offset,(int)len).CopyTo(seg.FreeBuffer);
  638. seg.Advance((int)len);
  639. }
  640. ParseData(ref seg);
  641. }
  642. }
  643. }
  644. else if (cmd == CMD_WASK)
  645. {
  646. // ready to send back CMD_WINS in flush
  647. // tell remote my window size
  648. probe |= ASK_TELL;
  649. }
  650. else if (cmd == CMD_WINS)
  651. {
  652. // do nothing
  653. }
  654. else
  655. {
  656. return -3;
  657. }
  658. offset += (int)len;
  659. size -= (int)len;
  660. }
  661. if (flag != 0)
  662. {
  663. ParseFastack(maxack, latest_ts);
  664. }
  665. // cwnd update when packet arrived
  666. if (Utils.TimeDiff(snd_una, prev_una) > 0)
  667. {
  668. if (cwnd < rmt_wnd)
  669. {
  670. if (cwnd < ssthresh)
  671. {
  672. cwnd++;
  673. incr += mss;
  674. }
  675. else
  676. {
  677. if (incr < mss) incr = mss;
  678. incr += (mss * mss) / incr + (mss / 16);
  679. if ((cwnd + 1) * mss <= incr)
  680. {
  681. cwnd = (incr + mss - 1) / ((mss > 0) ? mss : 1);
  682. }
  683. }
  684. if (cwnd > rmt_wnd)
  685. {
  686. cwnd = rmt_wnd;
  687. incr = rmt_wnd * mss;
  688. }
  689. }
  690. }
  691. return 0;
  692. }
  693. // flush helper function
  694. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  695. void MakeSpace(ref int size, int space)
  696. {
  697. if (size - RESERVED_BYTE + space > mtu)
  698. {
  699. output(buffer, size);
  700. size = RESERVED_BYTE;
  701. }
  702. }
  703. // flush helper function
  704. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  705. void FlushBuffer(int size)
  706. {
  707. // flush buffer up to 'offset' (<= MTU)
  708. if (size > RESERVED_BYTE)
  709. {
  710. output(buffer, size);
  711. }
  712. }
  713. // ikcp_flush
  714. // flush remain ack segments.
  715. // flush may output multiple <= MTU messages from MakeSpace / FlushBuffer.
  716. // the amount of messages depends on the sliding window.
  717. // configured by send/receive window sizes + congestion control.
  718. // with congestion control, the window will be extremely small(!).
  719. public void Flush()
  720. {
  721. int size = RESERVED_BYTE; // amount of bytes to flush. 'buffer ptr' in C.
  722. bool lost = false; // lost segments
  723. // update needs to be called before flushing
  724. if (!updated) return;
  725. // kcp only stack allocates a segment here for performance, leaving
  726. // its data buffer null because this segment's data buffer is never
  727. // used. that's fine in C, but in C# our segment is a class so we
  728. // need to allocate and most importantly, not forget to deallocate
  729. // it before returning.
  730. SegmentStruct seg = new SegmentStruct((int)this.mtu,this.kcpSegmentArrayPool);
  731. seg.SegHead.conv = conv;
  732. seg.SegHead.cmd = CMD_ACK;
  733. seg.SegHead.wnd = WndUnused();
  734. seg.SegHead.una = rcv_nxt;
  735. // flush acknowledges
  736. #if NET7_0_OR_GREATER
  737. foreach (ref AckItem ack in CollectionsMarshal.AsSpan(acklist))
  738. #else
  739. foreach (AckItem ack in acklist)
  740. #endif
  741. {
  742. MakeSpace(ref size, OVERHEAD);
  743. // ikcp_ack_get assigns ack[i] to seg.sn, seg.ts
  744. seg.SegHead.sn = ack.serialNumber;
  745. seg.SegHead.ts = ack.timestamp;
  746. seg.Encode(buffer.AsSpan(size), ref size);
  747. }
  748. acklist.Clear();
  749. // probe window size (if remote window size equals zero)
  750. if (rmt_wnd == 0)
  751. {
  752. if (probe_wait == 0)
  753. {
  754. probe_wait = PROBE_INIT;
  755. ts_probe = current + probe_wait;
  756. }
  757. else
  758. {
  759. if (Utils.TimeDiff(current, ts_probe) >= 0)
  760. {
  761. if (probe_wait < PROBE_INIT)
  762. probe_wait = PROBE_INIT;
  763. probe_wait += probe_wait / 2;
  764. if (probe_wait > PROBE_LIMIT)
  765. probe_wait = PROBE_LIMIT;
  766. ts_probe = current + probe_wait;
  767. probe |= ASK_SEND;
  768. }
  769. }
  770. }
  771. else
  772. {
  773. ts_probe = 0;
  774. probe_wait = 0;
  775. }
  776. // flush window probing commands
  777. if ((probe & ASK_SEND) != 0)
  778. {
  779. seg.SegHead.cmd = CMD_WASK;
  780. MakeSpace(ref size, OVERHEAD);
  781. seg.Encode(buffer.AsSpan(size), ref size);
  782. }
  783. // flush window probing commands
  784. if ((probe & ASK_TELL) != 0)
  785. {
  786. seg.SegHead.cmd = CMD_WINS;
  787. MakeSpace(ref size, OVERHEAD);
  788. seg.Encode(buffer.AsSpan(size), ref size);
  789. }
  790. probe = 0;
  791. // calculate the window size which is currently safe to send.
  792. // it's send window, or remote window, whatever is smaller.
  793. // for our max
  794. uint cwnd_ = Math.Min(snd_wnd, rmt_wnd);
  795. // double negative: if congestion window is enabled:
  796. // limit window size to cwnd.
  797. //
  798. // note this may heavily limit window sizes.
  799. // for our max message size test with super large windows of 32k,
  800. // 'congestion window' limits it down from 32.000 to 2.
  801. if (!nocwnd) cwnd_ = Math.Min(cwnd, cwnd_);
  802. // move cwnd_ 'window size' messages from snd_queue to snd_buf
  803. // 'snd_nxt' is what we want to send.
  804. // 'snd_una' is what hasn't been acked yet.
  805. // copy up to 'cwnd_' difference between them (sliding window)
  806. while (Utils.TimeDiff(snd_nxt, snd_una + cwnd_) < 0)
  807. {
  808. if (snd_queue.Count == 0) break;
  809. SegmentStruct newseg = snd_queue.Dequeue();
  810. newseg.SegHead.conv = conv;
  811. newseg.SegHead.cmd = CMD_PUSH;
  812. newseg.SegHead.wnd = seg.SegHead.wnd;
  813. newseg.SegHead.ts = current;
  814. newseg.SegHead.sn = snd_nxt;
  815. snd_nxt += 1; // increase sequence number for next segment
  816. newseg.SegHead.una = rcv_nxt;
  817. newseg.resendts = current;
  818. newseg.rto = rx_rto;
  819. newseg.fastack = 0;
  820. newseg.xmit = 0;
  821. snd_buf.Add(newseg);
  822. }
  823. // calculate resent
  824. uint resent = fastresend > 0 ? (uint)fastresend : 0xffffffff;
  825. uint rtomin = nodelay == 0 ? (uint)rx_rto >> 3 : 0;
  826. // flush data segments
  827. int change = 0;
  828. #if NET7_0_OR_GREATER
  829. var sndBufArr = CollectionsMarshal.AsSpan(this.snd_buf);
  830. for (int i = 0; i < sndBufArr.Length; i++)
  831. {
  832. ref SegmentStruct segment = ref sndBufArr[i];
  833. #else
  834. for (int i = 0; i < this.snd_buf.Count; i++)
  835. {
  836. SegmentStruct segment = this.snd_buf[i];
  837. #endif
  838. bool needsend = false;
  839. // initial transmit
  840. if (segment.xmit == 0)
  841. {
  842. needsend = true;
  843. segment.xmit++;
  844. segment.rto = this.rx_rto;
  845. segment.resendts = this.current + (uint) segment.rto + rtomin;
  846. }
  847. // RTO
  848. else if (Utils.TimeDiff(this.current, segment.resendts) >= 0)
  849. {
  850. needsend = true;
  851. segment.xmit++;
  852. this.xmit++;
  853. if (this.nodelay == 0)
  854. {
  855. segment.rto += Math.Max(segment.rto, this.rx_rto);
  856. }
  857. else
  858. {
  859. int step = (this.nodelay < 2)? segment.rto : this.rx_rto;
  860. segment.rto += step / 2;
  861. }
  862. segment.resendts = this.current + (uint) segment.rto;
  863. lost = true;
  864. }
  865. // fast retransmit
  866. else if (segment.fastack >= resent)
  867. {
  868. if (segment.xmit <= this.fastlimit || this.fastlimit <= 0)
  869. {
  870. needsend = true;
  871. segment.xmit++;
  872. segment.fastack = 0;
  873. segment.resendts = this.current + (uint) segment.rto;
  874. change++;
  875. }
  876. }
  877. if (needsend)
  878. {
  879. segment.SegHead.ts = this.current;
  880. segment.SegHead.wnd = seg.SegHead.wnd;
  881. segment.SegHead.una = this.rcv_nxt;
  882. int need = OVERHEAD + segment.WrittenCount;
  883. this.MakeSpace(ref size, need);
  884. segment.Encode(this.buffer.AsSpan(size),ref size);
  885. if (segment.WrittenCount > 0)
  886. {
  887. segment.WrittenBuffer.CopyTo(this.buffer.AsSpan(size));
  888. size += segment.WrittenCount;
  889. }
  890. // dead link happens if a message was resent N times, but an
  891. // ack was still not received.
  892. if (segment.xmit >= this.dead_link)
  893. {
  894. this.state = -1;
  895. }
  896. }
  897. #if !NET7_0_OR_GREATER
  898. this.snd_buf[i] = segment;
  899. #endif
  900. }
  901. // kcp stackallocs 'seg'. our C# segment is a class though, so we
  902. // need to properly delete and return it to the pool now that we are
  903. // done with it.
  904. // SegmentDelete(seg);
  905. seg.Dispose();
  906. // flush remaining segments
  907. FlushBuffer(size);
  908. // update ssthresh
  909. // rate halving, https://tools.ietf.org/html/rfc6937
  910. if (change > 0)
  911. {
  912. uint inflight = snd_nxt - snd_una;
  913. ssthresh = inflight / 2;
  914. if (ssthresh < THRESH_MIN)
  915. ssthresh = THRESH_MIN;
  916. cwnd = ssthresh + resent;
  917. incr = cwnd * mss;
  918. }
  919. // congestion control, https://tools.ietf.org/html/rfc5681
  920. if (lost)
  921. {
  922. // original C uses 'cwnd', not kcp->cwnd!
  923. ssthresh = cwnd_ / 2;
  924. if (ssthresh < THRESH_MIN)
  925. ssthresh = THRESH_MIN;
  926. cwnd = 1;
  927. incr = mss;
  928. }
  929. if (cwnd < 1)
  930. {
  931. cwnd = 1;
  932. incr = mss;
  933. }
  934. }
  935. // ikcp_update
  936. // update state (call it repeatedly, every 10ms-100ms), or you can ask
  937. // Check() when to call it again (without Input/Send calling).
  938. //
  939. // 'current' - current timestamp in millisec. pass it to Kcp so that
  940. // Kcp doesn't have to do any stopwatch/deltaTime/etc. code
  941. //
  942. // time as uint, likely to minimize bandwidth.
  943. // uint.max = 4294967295 ms = 1193 hours = 49 days
  944. public void Update(uint currentTimeMilliSeconds)
  945. {
  946. current = currentTimeMilliSeconds;
  947. // not updated yet? then set updated and last flush time.
  948. if (!updated)
  949. {
  950. updated = true;
  951. ts_flush = current;
  952. }
  953. // slap is time since last flush in milliseconds
  954. int slap = Utils.TimeDiff(current, ts_flush);
  955. // hard limit: if 10s elapsed, always flush no matter what
  956. if (slap >= 10000 || slap < -10000)
  957. {
  958. ts_flush = current;
  959. slap = 0;
  960. }
  961. // last flush is increased by 'interval' each time.
  962. // so slap >= is a strange way to check if interval has elapsed yet.
  963. if (slap >= 0)
  964. {
  965. // increase last flush time by one interval
  966. ts_flush += interval;
  967. // if last flush is still behind, increase it to current + interval
  968. // if (Utils.TimeDiff(current, ts_flush) >= 0) // original kcp.c
  969. if (current >= ts_flush) // less confusing
  970. {
  971. ts_flush = current + interval;
  972. }
  973. Flush();
  974. }
  975. }
  976. // ikcp_check
  977. // Determine when should you invoke update
  978. // Returns when you should invoke update in millisec, if there is no
  979. // input/send calling. you can call update in that time, instead of
  980. // call update repeatly.
  981. //
  982. // Important to reduce unnecessary update invoking. use it to schedule
  983. // update (e.g. implementing an epoll-like mechanism, or optimize update
  984. // when handling massive kcp connections).
  985. public uint Check(uint current_)
  986. {
  987. uint ts_flush_ = ts_flush;
  988. // int tm_flush = 0x7fffffff; original kcp: useless assignment
  989. int tm_packet = 0x7fffffff;
  990. if (!updated)
  991. {
  992. return current_;
  993. }
  994. if (Utils.TimeDiff(current_, ts_flush_) >= 10000 ||
  995. Utils.TimeDiff(current_, ts_flush_) < -10000)
  996. {
  997. ts_flush_ = current_;
  998. }
  999. if (Utils.TimeDiff(current_, ts_flush_) >= 0)
  1000. {
  1001. return current_;
  1002. }
  1003. int tm_flush = Utils.TimeDiff(ts_flush_, current_);
  1004. #if NET7_0_OR_GREATER
  1005. foreach (ref SegmentStruct seg in CollectionsMarshal.AsSpan(this.snd_buf))
  1006. #else
  1007. foreach (SegmentStruct seg in snd_buf)
  1008. #endif
  1009. {
  1010. int diff = Utils.TimeDiff(seg.resendts, current_);
  1011. if (diff <= 0)
  1012. {
  1013. return current_;
  1014. }
  1015. if (diff < tm_packet) tm_packet = diff;
  1016. }
  1017. uint minimal = (uint)(tm_packet < tm_flush ? tm_packet : tm_flush);
  1018. if (minimal >= interval) minimal = interval;
  1019. return current_ + minimal;
  1020. }
  1021. // ikcp_setmtu
  1022. // Change MTU (Maximum Transmission Unit) size.
  1023. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  1024. public void SetMtu(uint mtu)
  1025. {
  1026. if (mtu < 50 || mtu < OVERHEAD)
  1027. this.ThrowMTUException();
  1028. buffer = new byte[(mtu + OVERHEAD) * 3];
  1029. this.mtu = mtu;
  1030. mss = mtu - OVERHEAD;
  1031. }
  1032. // ikcp_interval
  1033. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  1034. public void SetInterval(uint interval)
  1035. {
  1036. // clamp interval between 10 and 5000
  1037. if (interval > 5000) interval = 5000;
  1038. else if (interval < 10) interval = 10;
  1039. this.interval = interval;
  1040. }
  1041. // ikcp_nodelay
  1042. // configuration: https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
  1043. // nodelay : Whether nodelay mode is enabled, 0 is not enabled; 1 enabled.
  1044. // interval :Protocol internal work interval, in milliseconds, such as 10 ms or 20 ms.
  1045. // resend :Fast retransmission mode, 0 represents off by default, 2 can be set (2 ACK spans will result in direct retransmission)
  1046. // nc :Whether to turn off flow control, 0 represents “Do not turn off” by default, 1 represents “Turn off”.
  1047. // Normal Mode: ikcp_nodelay(kcp, 0, 40, 0, 0);
  1048. // Turbo Mode: ikcp_nodelay(kcp, 1, 10, 2, 1);
  1049. public void SetNoDelay(uint nodelay, uint interval = INTERVAL, int resend = 0, bool nocwnd = false)
  1050. {
  1051. this.nodelay = nodelay;
  1052. if (nodelay != 0)
  1053. {
  1054. rx_minrto = RTO_NDL;
  1055. }
  1056. else
  1057. {
  1058. rx_minrto = RTO_MIN;
  1059. }
  1060. // clamp interval between 10 and 5000
  1061. if (interval > 5000) interval = 5000;
  1062. else if (interval < 10) interval = 10;
  1063. this.interval = interval;
  1064. if (resend >= 0)
  1065. {
  1066. fastresend = resend;
  1067. }
  1068. this.nocwnd = nocwnd;
  1069. }
  1070. // ikcp_wndsize
  1071. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  1072. public void SetWindowSize(uint sendWindow, uint receiveWindow)
  1073. {
  1074. if (sendWindow > 0)
  1075. {
  1076. snd_wnd = sendWindow;
  1077. }
  1078. if (receiveWindow > 0)
  1079. {
  1080. // must >= max fragment size
  1081. rcv_wnd = Math.Max(receiveWindow, WND_RCV);
  1082. }
  1083. }
  1084. [MethodImpl(MethodImplOptions.AggressiveInlining)]
  1085. public void SetMinrto(int minrto)
  1086. {
  1087. this.rx_minrto = minrto;
  1088. }
  1089. public void SetArrayPool(ArrayPool<byte> arrayPool)
  1090. {
  1091. this.kcpSegmentArrayPool = arrayPool;
  1092. }
  1093. [DoesNotReturn]
  1094. private void ThrowMTUException()
  1095. {
  1096. throw new ArgumentException("MTU must be higher than 50 and higher than OVERHEAD");
  1097. }
  1098. [DoesNotReturn]
  1099. private void ThrowFrgCountException(int len, int count)
  1100. {
  1101. throw new Exception($"Send len={len} requires {count} fragments, but kcp can only handle up to {FRG_MAX} fragments.");
  1102. }
  1103. }
  1104. }