HTTP2Handler.cs 34 KB


  1. #if (!UNITY_WEBGL || UNITY_EDITOR) && !BESTHTTP_DISABLE_ALTERNATE_SSL && !BESTHTTP_DISABLE_HTTP2
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. using System.Collections.Concurrent;
  6. using BestHTTP.Extensions;
  7. using BestHTTP.Core;
  8. using BestHTTP.PlatformSupport.Memory;
  9. using BestHTTP.Logger;
  10. using BestHTTP.PlatformSupport.Threading;
  11. namespace BestHTTP.Connections.HTTP2
  12. {
  13. public sealed class HTTP2Handler : IHTTPRequestHandler
  14. {
  15. public bool HasCustomRequestProcessor { get { return true; } }
  16. public KeepAliveHeader KeepAlive { get { return null; } }
  17. public bool CanProcessMultiple { get { return this.goAwaySentAt == DateTime.MaxValue && this.isRunning; } }
  18. // Connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n).
  19. private static readonly byte[] MAGIC = new byte[24] { 0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x32, 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a };
  20. public const UInt32 MaxValueFor31Bits = 0xFFFFFFFF >> 1;
  21. public double Latency { get; private set; }
  22. public HTTP2SettingsManager settings;
  23. public HPACKEncoder HPACKEncoder;
  24. public LoggingContext Context { get; private set; }
  25. private DateTime lastPingSent = DateTime.MinValue;
  26. private TimeSpan pingFrequency = TimeSpan.MaxValue; // going to be overridden in RunHandler
  27. private int waitingForPingAck = 0;
  28. public static int RTTBufferCapacity = 5;
  29. private CircularBuffer<double> rtts = new CircularBuffer<double>(RTTBufferCapacity);
  30. private volatile bool isRunning;
  31. private AutoResetEvent newFrameSignal = new AutoResetEvent(false);
  32. private ConcurrentQueue<HTTPRequest> requestQueue = new ConcurrentQueue<HTTPRequest>();
  33. private List<HTTP2Stream> clientInitiatedStreams = new List<HTTP2Stream>();
  34. private ConcurrentQueue<HTTP2FrameHeaderAndPayload> newFrames = new ConcurrentQueue<HTTP2FrameHeaderAndPayload>();
  35. private List<HTTP2FrameHeaderAndPayload> outgoingFrames = new List<HTTP2FrameHeaderAndPayload>();
  36. private UInt32 remoteWindow;
  37. private DateTime lastInteraction;
  38. private DateTime goAwaySentAt = DateTime.MaxValue;
  39. private HTTPConnection conn;
  40. private int threadExitCount;
  41. private TimeSpan MaxGoAwayWaitTime { get { return this.goAwaySentAt == DateTime.MaxValue ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(Math.Max(this.Latency * 2.5, 1500)); } }
  42. // https://httpwg.org/specs/rfc7540.html#StreamIdentifiers
  43. // Streams initiated by a client MUST use odd-numbered stream identifiers
  44. // With an initial value of -1, the first client initiated stream's id going to be 1.
  45. private long LastStreamId = -1;
  46. public HTTP2Handler(HTTPConnection conn)
  47. {
  48. this.Context = new LoggingContext(this);
  49. this.conn = conn;
  50. this.isRunning = true;
  51. this.settings = new HTTP2SettingsManager(this);
  52. Process(this.conn.CurrentRequest);
  53. }
  54. public void Process(HTTPRequest request)
  55. {
  56. HTTPManager.Logger.Information("HTTP2Handler", "Process request called", this.Context, request.Context);
  57. request.QueuedAt = DateTime.MinValue;
  58. request.ProcessingStarted = this.lastInteraction = DateTime.UtcNow;
  59. this.requestQueue.Enqueue(request);
  60. // Wee might added the request to a dead queue, signaling would be pointless.
  61. // When the ConnectionEventHelper processes the Close state-change event
  62. // requests in the queue going to be resent. (We should avoid resending the request just right now,
  63. // as it might still select this connection/handler resulting in a infinite loop.)
  64. if (Volatile.Read(ref this.threadExitCount) == 0)
  65. this.newFrameSignal.Set();
  66. }
  67. public void SignalRunnerThread()
  68. {
  69. this.newFrameSignal?.Set();
  70. }
  71. public void RunHandler()
  72. {
  73. HTTPManager.Logger.Information("HTTP2Handler", "Processing thread up and running!", this.Context);
  74. ThreadedRunner.SetThreadName("BestHTTP.HTTP2 Process");
  75. PlatformSupport.Threading.ThreadedRunner.RunLongLiving(ReadThread);
  76. try
  77. {
  78. bool atLeastOneStreamHasAFrameToSend = true;
  79. this.HPACKEncoder = new HPACKEncoder(this, this.settings);
  80. // https://httpwg.org/specs/rfc7540.html#InitialWindowSize
  81. // The connection flow-control window is also 65,535 octets.
  82. this.remoteWindow = this.settings.RemoteSettings[HTTP2Settings.INITIAL_WINDOW_SIZE];
  83. // we want to pack as many data as we can in one tcp segment, but setting the buffer's size too high
  84. // we might keep data too long and send them in bursts instead of in a steady stream.
  85. // Keeping it too low might result in a full tcp segment and one with very low payload
  86. // Is it possible that one full tcp segment sized buffer would be the best, or multiple of it.
  87. // It would keep the network busy without any fragments. The ethernet layer has a maximum of 1500 bytes,
  88. // but there's two layers of 20 byte headers each, so as a theoretical maximum it's 1500-20-20 bytes.
  89. // On the other hand, if the buffer is small (1-2), that means that for larger data, we have to do a lot
  90. // of system calls, in that case a larger buffer might be better. Still, if we are not cpu bound,
  91. // a well saturated network might serve us better.
  92. using (WriteOnlyBufferedStream bufferedStream = new WriteOnlyBufferedStream(this.conn.connector.Stream, 1024 * 1024 /*1500 - 20 - 20*/))
  93. {
  94. // The client connection preface starts with a sequence of 24 octets
  95. bufferedStream.Write(MAGIC, 0, MAGIC.Length);
  96. // This sequence MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty.
  97. // The client sends the client connection preface immediately upon receipt of a
  98. // 101 (Switching Protocols) response (indicating a successful upgrade)
  99. // or as the first application data octets of a TLS connection
  100. // Set streams' initial window size to its maximum.
  101. this.settings.InitiatedMySettings[HTTP2Settings.INITIAL_WINDOW_SIZE] = HTTPManager.HTTP2Settings.InitialStreamWindowSize;
  102. this.settings.InitiatedMySettings[HTTP2Settings.MAX_CONCURRENT_STREAMS] = HTTPManager.HTTP2Settings.MaxConcurrentStreams;
  103. this.settings.InitiatedMySettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] = (uint)(HTTPManager.HTTP2Settings.EnableConnectProtocol ? 1 : 0);
  104. this.settings.InitiatedMySettings[HTTP2Settings.ENABLE_PUSH] = 0;
  105. this.settings.SendChanges(this.outgoingFrames);
  106. this.settings.RemoteSettings.OnSettingChangedEvent += OnRemoteSettingChanged;
  107. // The default window size for the whole connection is 65535 bytes,
  108. // but we want to set it to the maximum possible value.
  109. Int64 initialConnectionWindowSize = HTTPManager.HTTP2Settings.InitialConnectionWindowSize;
  110. // yandex.ru returns with an FLOW_CONTROL_ERROR (3) error when the plugin tries to set the connection window to 2^31 - 1
  111. // and works only with a maximum value of 2^31 - 10Mib (10 * 1024 * 1024).
  112. if (initialConnectionWindowSize == HTTP2Handler.MaxValueFor31Bits)
  113. initialConnectionWindowSize -= 10 * 1024 * 1024;
  114. Int64 diff = initialConnectionWindowSize - 65535;
  115. if (diff > 0)
  116. this.outgoingFrames.Add(HTTP2FrameHelper.CreateWindowUpdateFrame(0, (UInt32)diff));
  117. this.pingFrequency = HTTPManager.HTTP2Settings.PingFrequency;
  118. while (this.isRunning)
  119. {
  120. DateTime now = DateTime.UtcNow;
  121. if (!atLeastOneStreamHasAFrameToSend)
  122. {
  123. // buffered stream will call flush automatically if its internal buffer is full.
  124. // But we have to make it sure that we flush remaining data before we go to sleep.
  125. bufferedStream.Flush();
  126. // Wait until we have to send the next ping, OR a new frame is received on the read thread.
  127. // lastPingSent Now lastPingSent+frequency lastPingSent+Ping timeout
  128. //----|---------------------|---------------|----------------------|----------------------|------------|
  129. // lastInteraction lastInteraction + MaxIdleTime
  130. var sendPingAt = this.lastPingSent + this.pingFrequency;
  131. var timeoutAt = this.waitingForPingAck != 0 ? this.lastPingSent + HTTPManager.HTTP2Settings.Timeout : DateTime.MaxValue;
  132. var nextPingInteraction = sendPingAt < timeoutAt ? sendPingAt : timeoutAt;
  133. var disconnectByIdleAt = this.lastInteraction + HTTPManager.HTTP2Settings.MaxIdleTime;
  134. var nextDueClientInteractionAt = nextPingInteraction < disconnectByIdleAt ? nextPingInteraction : disconnectByIdleAt;
  135. int wait = (int)(nextDueClientInteractionAt - now).TotalMilliseconds;
  136. wait = (int)Math.Min(wait, this.MaxGoAwayWaitTime.TotalMilliseconds);
  137. TimeSpan nextStreamInteraction = TimeSpan.MaxValue;
  138. for (int i = 0; i < this.clientInitiatedStreams.Count; i++)
  139. {
  140. var streamInteraction = this.clientInitiatedStreams[i].NextInteraction;
  141. if (streamInteraction < nextStreamInteraction)
  142. nextStreamInteraction = streamInteraction;
  143. }
  144. wait = (int)Math.Min(wait, nextStreamInteraction.TotalMilliseconds);
  145. if (wait >= 1)
  146. {
  147. if (HTTPManager.Logger.Level <= Logger.Loglevels.All)
  148. HTTPManager.Logger.Information("HTTP2Handler", string.Format("Sleeping for {0:N0}ms", wait), this.Context);
  149. this.newFrameSignal.WaitOne(wait);
  150. now = DateTime.UtcNow;
  151. }
  152. }
  153. // Don't send a new ping until a pong isn't received for the last one
  154. if (now - this.lastPingSent >= this.pingFrequency && Interlocked.CompareExchange(ref this.waitingForPingAck, 1, 0) == 0)
  155. {
  156. this.lastPingSent = now;
  157. var frame = HTTP2FrameHelper.CreatePingFrame(HTTP2PingFlags.None);
  158. BufferHelper.SetLong(frame.Payload, 0, now.Ticks);
  159. this.outgoingFrames.Add(frame);
  160. }
  161. // If no pong received in a (configurable) reasonable time, treat the connection broken
  162. if (this.waitingForPingAck != 0 && now - this.lastPingSent >= HTTPManager.HTTP2Settings.Timeout)
  163. throw new TimeoutException("Ping ACK isn't received in time!");
  164. // Process received frames
  165. HTTP2FrameHeaderAndPayload header;
  166. while (this.newFrames.TryDequeue(out header))
  167. {
  168. if (header.StreamId > 0)
  169. {
  170. HTTP2Stream http2Stream = FindStreamById(header.StreamId);
  171. // Add frame to the stream, so it can process it when its Process function is called
  172. if (http2Stream != null)
  173. {
  174. http2Stream.AddFrame(header, this.outgoingFrames);
  175. }
  176. else
  177. {
  178. // Error? It's possible that we closed and removed the stream while the server was in the middle of sending frames
  179. if (HTTPManager.Logger.Level == Loglevels.All)
  180. HTTPManager.Logger.Warning("HTTP2Handler", string.Format("No stream found for id: {0}! Can't deliver frame: {1}", header.StreamId, header), this.Context, http2Stream.Context);
  181. }
  182. }
  183. else
  184. {
  185. switch (header.Type)
  186. {
  187. case HTTP2FrameTypes.SETTINGS:
  188. this.settings.Process(header, this.outgoingFrames);
  189. PluginEventHelper.EnqueuePluginEvent(
  190. new PluginEventInfo(PluginEvents.HTTP2ConnectProtocol,
  191. new HTTP2ConnectProtocolInfo(this.conn.LastProcessedUri.Host,
  192. this.settings.MySettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] == 1 && this.settings.RemoteSettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] == 1)));
  193. break;
  194. case HTTP2FrameTypes.PING:
  195. var pingFrame = HTTP2FrameHelper.ReadPingFrame(header);
  196. // https://httpwg.org/specs/rfc7540.html#PING
  197. // if it wasn't an ack for our ping, we have to send one
  198. if ((pingFrame.Flags & HTTP2PingFlags.ACK) == 0)
  199. {
  200. var frame = HTTP2FrameHelper.CreatePingFrame(HTTP2PingFlags.ACK);
  201. Array.Copy(pingFrame.OpaqueData, 0, frame.Payload, 0, pingFrame.OpaqueDataLength);
  202. this.outgoingFrames.Add(frame);
  203. }
  204. BufferPool.Release(pingFrame.OpaqueData);
  205. break;
  206. case HTTP2FrameTypes.WINDOW_UPDATE:
  207. var windowUpdateFrame = HTTP2FrameHelper.ReadWindowUpdateFrame(header);
  208. this.remoteWindow += windowUpdateFrame.WindowSizeIncrement;
  209. break;
  210. case HTTP2FrameTypes.GOAWAY:
  211. // parse the frame, so we can print out detailed information
  212. HTTP2GoAwayFrame goAwayFrame = HTTP2FrameHelper.ReadGoAwayFrame(header);
  213. HTTPManager.Logger.Information("HTTP2Handler", "Received GOAWAY frame: " + goAwayFrame.ToString(), this.Context);
  214. string msg = string.Format("Server closing the connection! Error code: {0} ({1}) Additonal Debug Data: {2}", goAwayFrame.Error, goAwayFrame.ErrorCode, new BufferSegment(goAwayFrame.AdditionalDebugData, 0, (int)goAwayFrame.AdditionalDebugDataLength));
  215. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  216. this.clientInitiatedStreams[i].Abort(msg);
  217. this.clientInitiatedStreams.Clear();
  218. // set the running flag to false, so the thread can exit
  219. this.isRunning = false;
  220. BufferPool.Release(goAwayFrame.AdditionalDebugData);
  221. //this.conn.State = HTTPConnectionStates.Closed;
  222. break;
  223. case HTTP2FrameTypes.ALT_SVC:
  224. //HTTP2AltSVCFrame altSvcFrame = HTTP2FrameHelper.ReadAltSvcFrame(header);
  225. // Implement
  226. //HTTPManager.EnqueuePluginEvent(new PluginEventInfo(PluginEvents.AltSvcHeader, new AltSvcEventInfo(altSvcFrame.Origin, ))
  227. break;
  228. }
  229. if (header.Payload != null)
  230. BufferPool.Release(header.Payload);
  231. }
  232. }
  233. UInt32 maxConcurrentStreams = Math.Min(HTTPManager.HTTP2Settings.MaxConcurrentStreams, this.settings.RemoteSettings[HTTP2Settings.MAX_CONCURRENT_STREAMS]);
  234. // pre-test stream count to lock only when truly needed.
  235. if (this.clientInitiatedStreams.Count < maxConcurrentStreams && this.isRunning)
  236. {
  237. // grab requests from queue
  238. HTTPRequest request;
  239. while (this.clientInitiatedStreams.Count < maxConcurrentStreams && this.requestQueue.TryDequeue(out request))
  240. {
  241. HTTP2Stream newStream = null;
  242. #if !BESTHTTP_DISABLE_WEBSOCKET
  243. if (request.Tag is WebSocket.OverHTTP2)
  244. {
  245. newStream = new HTTP2WebSocketStream((UInt32)Interlocked.Add(ref LastStreamId, 2), this, this.settings, this.HPACKEncoder);
  246. }
  247. else
  248. #endif
  249. {
  250. newStream = new HTTP2Stream((UInt32)Interlocked.Add(ref LastStreamId, 2), this, this.settings, this.HPACKEncoder);
  251. }
  252. newStream.Assign(request);
  253. this.clientInitiatedStreams.Add(newStream);
  254. }
  255. }
  256. // send any settings changes
  257. this.settings.SendChanges(this.outgoingFrames);
  258. atLeastOneStreamHasAFrameToSend = false;
  259. // process other streams
  260. // Room for improvement Streams should be processed by their priority!
  261. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  262. {
  263. var stream = this.clientInitiatedStreams[i];
  264. stream.Process(this.outgoingFrames);
  265. // remove closed, empty streams (not enough to check the closed flag, a closed stream still can contain frames to send)
  266. if (stream.State == HTTP2StreamStates.Closed && !stream.HasFrameToSend)
  267. {
  268. this.clientInitiatedStreams.RemoveAt(i--);
  269. stream.Removed();
  270. }
  271. atLeastOneStreamHasAFrameToSend |= stream.HasFrameToSend;
  272. this.lastInteraction = DateTime.UtcNow;
  273. }
  274. // If we encounter a data frame that too large for the current remote window, we have to stop
  275. // sending all data frames as we could send smaller data frames before the large ones.
  276. // Room for improvement: An improvement would be here to stop data frame sending per-stream.
  277. bool haltDataSending = false;
  278. if (this.ShutdownType == ShutdownTypes.Running && now - this.lastInteraction >= HTTPManager.HTTP2Settings.MaxIdleTime)
  279. {
  280. this.lastInteraction = DateTime.UtcNow;
  281. HTTPManager.Logger.Information("HTTP2Handler", "Reached idle time, sending GoAway frame!", this.Context);
  282. this.outgoingFrames.Add(HTTP2FrameHelper.CreateGoAwayFrame(0, HTTP2ErrorCodes.NO_ERROR));
  283. this.goAwaySentAt = DateTime.UtcNow;
  284. }
  285. // https://httpwg.org/specs/rfc7540.html#GOAWAY
  286. // Endpoints SHOULD always send a GOAWAY frame before closing a connection so that the remote peer can know whether a stream has been partially processed or not.
  287. if (this.ShutdownType == ShutdownTypes.Gentle)
  288. {
  289. HTTPManager.Logger.Information("HTTP2Handler", "Connection abort requested, sending GoAway frame!", this.Context);
  290. this.outgoingFrames.Clear();
  291. this.outgoingFrames.Add(HTTP2FrameHelper.CreateGoAwayFrame(0, HTTP2ErrorCodes.NO_ERROR));
  292. this.goAwaySentAt = DateTime.UtcNow;
  293. }
  294. if (this.isRunning && now - goAwaySentAt >= this.MaxGoAwayWaitTime)
  295. {
  296. HTTPManager.Logger.Information("HTTP2Handler", "No GoAway frame received back. Really quitting now!", this.Context);
  297. this.isRunning = false;
  298. //conn.State = HTTPConnectionStates.Closed;
  299. }
  300. uint streamWindowUpdates = 0;
  301. // Go through all the collected frames and send them.
  302. for (int i = 0; i < this.outgoingFrames.Count; ++i)
  303. {
  304. var frame = this.outgoingFrames[i];
  305. if (HTTPManager.Logger.Level <= Logger.Loglevels.All && frame.Type != HTTP2FrameTypes.DATA /*&& frame.Type != HTTP2FrameTypes.PING*/)
  306. HTTPManager.Logger.Information("HTTP2Handler", "Sending frame: " + frame.ToString(), this.Context);
  307. // post process frames
  308. switch (frame.Type)
  309. {
  310. case HTTP2FrameTypes.DATA:
  311. if (haltDataSending)
  312. continue;
  313. // if the tracked remoteWindow is smaller than the frame's payload, we stop sending
  314. // data frames until we receive window-update frames
  315. if (frame.PayloadLength > this.remoteWindow)
  316. {
  317. haltDataSending = true;
  318. HTTPManager.Logger.Warning("HTTP2Handler", string.Format("Data sending halted for this round. Remote Window: {0:N0}, frame: {1}", this.remoteWindow, frame.ToString()), this.Context);
  319. continue;
  320. }
  321. break;
  322. case HTTP2FrameTypes.WINDOW_UPDATE:
  323. if (frame.StreamId > 0)
  324. streamWindowUpdates += BufferHelper.ReadUInt31(frame.Payload, 0);
  325. break;
  326. }
  327. this.outgoingFrames.RemoveAt(i--);
  328. using (var buffer = HTTP2FrameHelper.HeaderAsBinary(frame))
  329. bufferedStream.Write(buffer.Data, 0, buffer.Length);
  330. if (frame.PayloadLength > 0)
  331. {
  332. bufferedStream.Write(frame.Payload, (int)frame.PayloadOffset, (int)frame.PayloadLength);
  333. if (!frame.DontUseMemPool)
  334. BufferPool.Release(frame.Payload);
  335. }
  336. if (frame.Type == HTTP2FrameTypes.DATA)
  337. this.remoteWindow -= frame.PayloadLength;
  338. }
  339. if (streamWindowUpdates > 0)
  340. {
  341. var frame = HTTP2FrameHelper.CreateWindowUpdateFrame(0, streamWindowUpdates);
  342. if (HTTPManager.Logger.Level <= Logger.Loglevels.All)
  343. HTTPManager.Logger.Information("HTTP2Handler", "Sending frame: " + frame.ToString(), this.Context);
  344. using (var buffer = HTTP2FrameHelper.HeaderAsBinary(frame))
  345. bufferedStream.Write(buffer.Data, 0, buffer.Length);
  346. bufferedStream.Write(frame.Payload, (int)frame.PayloadOffset, (int)frame.PayloadLength);
  347. if (!frame.DontUseMemPool)
  348. BufferPool.Release(frame.Payload);
  349. }
  350. } // while (this.isRunning)
  351. bufferedStream.Flush();
  352. }
  353. }
  354. catch (Exception ex)
  355. {
  356. // Log out the exception if it's a non-expected one.
  357. if (this.ShutdownType == ShutdownTypes.Running && this.goAwaySentAt == DateTime.MaxValue && !HTTPManager.IsQuitting)
  358. HTTPManager.Logger.Exception("HTTP2Handler", "Sender thread", ex, this.Context);
  359. }
  360. finally
  361. {
  362. TryToCleanup();
  363. HTTPManager.Logger.Information("HTTP2Handler", "Sender thread closing - cleaning up remaining request...", this.Context);
  364. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  365. this.clientInitiatedStreams[i].Abort("Connection closed unexpectedly");
  366. this.clientInitiatedStreams.Clear();
  367. HTTPManager.Logger.Information("HTTP2Handler", "Sender thread closing", this.Context);
  368. }
  369. try
  370. {
  371. if (this.conn != null && this.conn.connector != null)
  372. {
  373. // Works in the new runtime
  374. if (this.conn.connector.TopmostStream != null)
  375. using (this.conn.connector.TopmostStream) { }
  376. // Works in the old runtime
  377. if (this.conn.connector.Stream != null)
  378. using (this.conn.connector.Stream) { }
  379. }
  380. }
  381. catch
  382. { }
  383. }
  384. private void OnRemoteSettingChanged(HTTP2SettingsRegistry registry, HTTP2Settings setting, uint oldValue, uint newValue)
  385. {
  386. switch(setting)
  387. {
  388. case HTTP2Settings.INITIAL_WINDOW_SIZE:
  389. this.remoteWindow = newValue - (oldValue - this.remoteWindow);
  390. break;
  391. }
  392. }
  393. private void ReadThread()
  394. {
  395. try
  396. {
  397. ThreadedRunner.SetThreadName("BestHTTP.HTTP2 Read");
  398. HTTPManager.Logger.Information("HTTP2Handler", "Reader thread up and running!", this.Context);
  399. while (this.isRunning)
  400. {
  401. HTTP2FrameHeaderAndPayload header = HTTP2FrameHelper.ReadHeader(this.conn.connector.Stream);
  402. if (HTTPManager.Logger.Level <= Logger.Loglevels.Information && header.Type != HTTP2FrameTypes.DATA /*&& header.Type != HTTP2FrameTypes.PING*/)
  403. HTTPManager.Logger.Information("HTTP2Handler", "New frame received: " + header.ToString(), this.Context);
  404. // Add the new frame to the queue. Processing it on the write thread gives us the advantage that
  405. // we don't have to deal with too much locking.
  406. this.newFrames.Enqueue(header);
  407. // ping write thread to process the new frame
  408. this.newFrameSignal.Set();
  409. switch (header.Type)
  410. {
  411. // Handle pongs on the read thread, so no additional latency is added to the rtt calculation.
  412. case HTTP2FrameTypes.PING:
  413. var pingFrame = HTTP2FrameHelper.ReadPingFrame(header);
  414. if ((pingFrame.Flags & HTTP2PingFlags.ACK) != 0)
  415. {
  416. if (Interlocked.CompareExchange(ref this.waitingForPingAck, 0, 1) == 0)
  417. break; // waitingForPingAck was 0 == aren't expecting a ping ack!
  418. // it was an ack, payload must contain what we sent
  419. var ticks = BufferHelper.ReadLong(pingFrame.OpaqueData, 0);
  420. // the difference between the current time and the time when the ping message is sent
  421. TimeSpan diff = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - ticks);
  422. // add it to the buffer
  423. this.rtts.Add(diff.TotalMilliseconds);
  424. // and calculate the new latency
  425. this.Latency = CalculateLatency();
  426. HTTPManager.Logger.Verbose("HTTP2Handler", string.Format("Latency: {0:F2}ms, RTT buffer: {1}", this.Latency, this.rtts.ToString()), this.Context);
  427. }
  428. BufferPool.Release(pingFrame.OpaqueData);
  429. break;
  430. case HTTP2FrameTypes.GOAWAY:
  431. // Just exit from this thread. The processing thread will handle the frame too.
  432. // Risking a double release here if the processing thread also consumed the goaway frame
  433. //if (Volatile.Read(ref this.threadExitCount) > 0)
  434. // BufferPool.Release(header.Payload);
  435. return;
  436. }
  437. }
  438. }
  439. catch //(Exception ex)
  440. {
  441. //HTTPManager.Logger.Exception("HTTP2Handler", "", ex, this.Context);
  442. //this.isRunning = false;
  443. }
  444. finally
  445. {
  446. TryToCleanup();
  447. HTTPManager.Logger.Information("HTTP2Handler", "Reader thread closing", this.Context);
  448. }
  449. }
  450. private void TryToCleanup()
  451. {
  452. this.isRunning = false;
  453. // First thread closing notifies the ConnectionEventHelper
  454. int counter = Interlocked.Increment(ref this.threadExitCount);
  455. switch(counter)
  456. {
  457. case 1:
  458. ConnectionEventHelper.EnqueueConnectionEvent(new ConnectionEventInfo(this.conn, HTTPConnectionStates.Closed));
  459. break;
  460. // Last thread closes the AutoResetEvent
  461. case 2:
  462. if (this.newFrameSignal != null)
  463. this.newFrameSignal.Close();
  464. this.newFrameSignal = null;
  465. while (this.newFrames.TryDequeue(out var frame))
  466. BufferPool.Release(frame.Payload);
  467. break;
  468. default:
  469. HTTPManager.Logger.Warning("HTTP2Handler", String.Format("TryToCleanup - counter is {0}!", counter));
  470. break;
  471. }
  472. }
  473. private double CalculateLatency()
  474. {
  475. if (this.rtts.Count == 0)
  476. return 0;
  477. double sumLatency = 0;
  478. for (int i = 0; i < this.rtts.Count; ++i)
  479. sumLatency += this.rtts[i];
  480. return sumLatency / this.rtts.Count;
  481. }
  482. HTTP2Stream FindStreamById(UInt32 streamId)
  483. {
  484. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  485. {
  486. var stream = this.clientInitiatedStreams[i];
  487. if (stream.Id == streamId)
  488. return stream;
  489. }
  490. return null;
  491. }
  492. public ShutdownTypes ShutdownType { get; private set; }
  493. public void Shutdown(ShutdownTypes type)
  494. {
  495. this.ShutdownType = type;
  496. switch(this.ShutdownType)
  497. {
  498. case ShutdownTypes.Gentle:
  499. this.newFrameSignal.Set();
  500. break;
  501. case ShutdownTypes.Immediate:
  502. this.conn.connector.Stream.Dispose();
  503. break;
  504. }
  505. }
  506. public void Dispose()
  507. {
  508. HTTPRequest request = null;
  509. while (this.requestQueue.TryDequeue(out request))
  510. {
  511. HTTPManager.Logger.Information("HTTP2Handler", string.Format("Dispose - Request '{0}' IsCancellationRequested: {1}", request.CurrentUri.ToString(), request.IsCancellationRequested.ToString()), this.Context);
  512. if (request.IsCancellationRequested)
  513. {
  514. request.Response = null;
  515. request.State = HTTPRequestStates.Aborted;
  516. }
  517. else
  518. RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(request, RequestEvents.Resend));
  519. }
  520. }
  521. }
  522. }
  523. #endif