HTTP2WebSocketStream.cs 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #if (!UNITY_WEBGL || UNITY_EDITOR) && !BESTHTTP_DISABLE_ALTERNATE_SSL && !BESTHTTP_DISABLE_HTTP2 && !BESTHTTP_DISABLE_WEBSOCKET
  2. using System;
  3. using System.Collections.Generic;
  4. using BestHTTP.Extensions;
  5. using BestHTTP.PlatformSupport.Memory;
  6. using BestHTTP.WebSocket;
  7. namespace BestHTTP.Connections.HTTP2
  8. {
  9. public sealed class HTTP2WebSocketStream : HTTP2Stream
  10. {
  11. public override bool HasFrameToSend
  12. {
  13. get
  14. {
  15. // Don't let the connection sleep until
  16. return this.outgoing.Count > 0 || // we already booked at least one frame in advance
  17. (this.State == HTTP2StreamStates.Open &&
  18. this.remoteWindow > 0 &&
  19. this.lastReadCount > 0 &&
  20. (this.overHTTP2.frames.Count > 0 || this.chunkQueue.Count > 0)); // we are in the middle of sending request data
  21. }
  22. }
  23. public override TimeSpan NextInteraction => this.overHTTP2.GetNextInteraction();
  24. private OverHTTP2 overHTTP2;
  25. // local list of websocket header-data pairs
  26. private List<KeyValuePair<BufferSegment, BufferSegment>> chunkQueue = new List<KeyValuePair<BufferSegment, BufferSegment>>();
  27. public HTTP2WebSocketStream(uint id, HTTP2Handler parentHandler, HTTP2SettingsManager registry, HPACKEncoder hpackEncoder) : base(id, parentHandler, registry, hpackEncoder)
  28. {
  29. }
  30. public override void Assign(HTTPRequest request)
  31. {
  32. base.Assign(request);
  33. this.overHTTP2 = request.Tag as OverHTTP2;
  34. this.overHTTP2.SetHTTP2Handler(this.parent);
  35. }
  36. protected override void ProcessIncomingDATAFrame(ref HTTP2FrameHeaderAndPayload frame, ref uint windowUpdate)
  37. {
  38. try
  39. {
  40. if (this.State != HTTP2StreamStates.HalfClosedLocal && this.State != HTTP2StreamStates.Open)
  41. {
  42. // ERROR!
  43. return;
  44. }
  45. this.downloaded += frame.PayloadLength;
  46. this.overHTTP2.OnReadThread(frame.Payload.AsBuffer((int)frame.PayloadOffset, (int)frame.PayloadLength));
  47. // frame's buffer will be released later
  48. frame.DontUseMemPool = true;
  49. // Track received data, and if necessary(local window getting too low), send a window update frame
  50. if (this.localWindow < frame.PayloadLength)
  51. {
  52. HTTPManager.Logger.Error(nameof(HTTP2WebSocketStream), string.Format("[{0}] Frame's PayloadLength ({1:N0}) is larger then local window ({2:N0}). Frame: {3}", this.Id, frame.PayloadLength, this.localWindow, frame), this.Context, this.AssignedRequest.Context, this.parent.Context);
  53. }
  54. else
  55. this.localWindow -= frame.PayloadLength;
  56. if ((frame.Flags & (byte)HTTP2DataFlags.END_STREAM) != 0)
  57. this.isEndSTRReceived = true;
  58. if (this.isEndSTRReceived)
  59. {
  60. HTTPManager.Logger.Information(nameof(HTTP2WebSocketStream), string.Format("[{0}] All data arrived, data length: {1:N0}", this.Id, this.downloaded), this.Context, this.AssignedRequest.Context, this.parent.Context);
  61. // create a short living thread to process the downloaded data:
  62. PlatformSupport.Threading.ThreadedRunner.RunShortLiving<HTTP2Stream, FramesAsStreamView>(FinishRequest, this, this.dataView);
  63. this.dataView = null;
  64. if (this.State == HTTP2StreamStates.HalfClosedLocal)
  65. this.State = HTTP2StreamStates.Closed;
  66. else
  67. this.State = HTTP2StreamStates.HalfClosedRemote;
  68. }
  69. if (this.isEndSTRReceived || this.localWindow <= this.windowUpdateThreshold)
  70. windowUpdate += this.settings.MySettings[HTTP2Settings.INITIAL_WINDOW_SIZE] - this.localWindow - windowUpdate;
  71. }
  72. catch (Exception ex)
  73. {
  74. HTTPManager.Logger.Exception(nameof(HTTP2WebSocketStream), nameof(ProcessIncomingDATAFrame), ex, this.parent.Context);
  75. }
  76. }
  77. protected override void ProcessOpenState(List<HTTP2FrameHeaderAndPayload> outgoingFrames)
  78. {
  79. try
  80. {
  81. // remote Window can be negative! See https://httpwg.org/specs/rfc7540.html#InitialWindowSize
  82. if (this.remoteWindow <= 0)
  83. {
  84. HTTPManager.Logger.Information(nameof(HTTP2WebSocketStream), string.Format("[{0}] Skipping data sending as remote Window is {1}!", this.Id, this.remoteWindow), this.Context, this.AssignedRequest.Context, this.parent.Context);
  85. return;
  86. }
  87. this.overHTTP2.PreReadCallback();
  88. Int64 maxFragmentSize = Math.Min(BestHTTP.WebSocket.WebSocket.MaxFragmentSize, this.settings.RemoteSettings[HTTP2Settings.MAX_FRAME_SIZE]);
  89. Int64 maxFrameSize = Math.Min(maxFragmentSize, this.remoteWindow);
  90. if (chunkQueue.Count == 0)
  91. {
  92. if (this.overHTTP2.frames.TryDequeue(out var frame))
  93. {
  94. this.overHTTP2._bufferedAmount -= (int)frame.Data.Count;
  95. frame.WriteTo((header, data) => chunkQueue.Add(new KeyValuePair<BufferSegment, BufferSegment>(header, data)), (uint)maxFragmentSize, false, this.Context);
  96. }
  97. }
  98. while (this.remoteWindow >= 6 && chunkQueue.Count > 0)
  99. {
  100. var kvp = chunkQueue[0];
  101. BufferSegment header = kvp.Key;
  102. BufferSegment data = kvp.Value;
  103. int minBytes = header.Count;
  104. int maxBytes = minBytes + data.Count;
  105. // remote window is less than the minimum we have to send, or
  106. // the frame has data but we have space only to send the websocket header
  107. if (this.remoteWindow < minBytes || (maxBytes > minBytes && this.remoteWindow == minBytes))
  108. return;
  109. HTTP2FrameHeaderAndPayload headerFrame = new HTTP2FrameHeaderAndPayload();
  110. headerFrame.Type = HTTP2FrameTypes.DATA;
  111. headerFrame.StreamId = this.Id;
  112. headerFrame.PayloadOffset = (uint)header.Offset;
  113. headerFrame.PayloadLength = (uint)header.Count;
  114. headerFrame.Payload = header.Data;
  115. headerFrame.DontUseMemPool = false;
  116. if (data.Count > 0)
  117. {
  118. HTTP2FrameHeaderAndPayload dataFrame = new HTTP2FrameHeaderAndPayload();
  119. dataFrame.Type = HTTP2FrameTypes.DATA;
  120. dataFrame.StreamId = this.Id;
  121. var buff = data.Slice(data.Offset, (int)Math.Min(data.Count, maxFrameSize));
  122. dataFrame.PayloadOffset = (uint)buff.Offset;
  123. dataFrame.PayloadLength = (uint)buff.Count;
  124. dataFrame.Payload = buff.Data;
  125. data = data.Slice(buff.Offset + buff.Count);
  126. if (data.Count == 0)
  127. chunkQueue.RemoveAt(0);
  128. else
  129. chunkQueue[0] = new KeyValuePair<BufferSegment, BufferSegment>(header, data);
  130. // release the buffer only with the final frame and with the final frame's last data chunk
  131. bool isLast = (header.Data[header.Offset] & 0x80) != 0 /*&& chunkQueue.Count == 0*/;
  132. dataFrame.DontUseMemPool = !isLast;
  133. this.outgoing.Enqueue(headerFrame);
  134. this.outgoing.Enqueue(dataFrame);
  135. }
  136. else
  137. {
  138. this.outgoing.Enqueue(headerFrame);
  139. chunkQueue.RemoveAt(0);
  140. }
  141. }
  142. }
  143. catch (Exception ex)
  144. {
  145. HTTPManager.Logger.Exception(nameof(HTTP2WebSocketStream), nameof(ProcessOpenState), ex, this.parent.Context);
  146. }
  147. }
  148. }
  149. }
  150. #endif