BufferedReadNetworkStream.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. using System;
  2. using System.IO;
  3. using BestHTTP.Extensions;
  4. namespace BestHTTP.Connections
  5. {
  6. public sealed class BufferedReadNetworkStream : Stream
  7. {
  8. #region Network Stats
  9. public static long TotalNetworkBytesReceived { get => _totalNetworkBytesReceived; }
  10. private static long _totalNetworkBytesReceived;
  11. internal static void IncrementTotalNetworkBytesReceived(int amount) => System.Threading.Interlocked.Add(ref _totalNetworkBytesReceived, amount);
  12. public static long TotalNetworkBytesSent { get => _totalNetworkBytesSent; }
  13. private static long _totalNetworkBytesSent;
  14. internal static void IncrementTotalNetworkBytesSent(int amount) => System.Threading.Interlocked.Add(ref _totalNetworkBytesSent, amount);
  15. public static int TotalConnections { get => _totalConnections; }
  16. private static int _totalConnections;
  17. public static int OpenConnections { get => _openConnections; }
  18. private static int _openConnections;
  19. internal static void IncrementCurrentConnections()
  20. {
  21. System.Threading.Interlocked.Increment(ref _totalConnections);
  22. System.Threading.Interlocked.Increment(ref _openConnections);
  23. }
  24. internal static void DecrementCurrentConnections() => System.Threading.Interlocked.Decrement(ref _openConnections);
  25. internal static void ResetNetworkStats()
  26. {
  27. System.Threading.Interlocked.Exchange(ref _totalNetworkBytesReceived, 0);
  28. System.Threading.Interlocked.Exchange(ref _totalNetworkBytesSent, 0);
  29. System.Threading.Interlocked.Exchange(ref _totalConnections, 0);
  30. System.Threading.Interlocked.Exchange(ref _openConnections, 0);
  31. }
  32. #endregion
  33. public override bool CanRead { get { throw new NotImplementedException(); } }
  34. public override bool CanSeek { get { throw new NotImplementedException(); } }
  35. public override bool CanWrite { get { throw new NotImplementedException(); } }
  36. public override long Length { get { throw new NotImplementedException(); } }
  37. public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } }
  38. private ReadOnlyBufferedStream readStream;
  39. private Stream innerStream;
  40. public BufferedReadNetworkStream(Stream stream, int bufferSize)
  41. {
  42. this.innerStream = stream;
  43. this.readStream = new ReadOnlyBufferedStream(stream, bufferSize);
  44. IncrementCurrentConnections();
  45. }
  46. public override void Flush()
  47. {
  48. }
  49. public override int Read(byte[] buffer, int offset, int count)
  50. {
  51. int read = this.readStream.Read(buffer, offset, count);
  52. IncrementTotalNetworkBytesReceived(read);
  53. return read;
  54. }
  55. public override long Seek(long offset, SeekOrigin origin)
  56. {
  57. throw new NotImplementedException();
  58. }
  59. public override void SetLength(long value)
  60. {
  61. throw new NotImplementedException();
  62. }
  63. public override void Write(byte[] buffer, int offset, int count)
  64. {
  65. IncrementTotalNetworkBytesSent(count);
  66. this.innerStream.Write(buffer, offset, count);
  67. }
  68. public override void Close()
  69. {
  70. base.Close();
  71. if (this.innerStream != null)
  72. {
  73. lock (this)
  74. {
  75. if (this.innerStream != null)
  76. {
  77. DecrementCurrentConnections();
  78. var stream = this.innerStream;
  79. this.innerStream = null;
  80. stream.Close();
  81. }
  82. if (this.readStream != null)
  83. {
  84. this.readStream.Close();
  85. this.readStream = null;
  86. }
  87. }
  88. }
  89. }
  90. }
  91. // Non-used experimental stream. Reading from the inner stream is done parallel and Read is blocked if no data is buffered.
  92. // Additionally BC reads 5 bytes for the TLS header, than the payload. Buffering data from the network could save at least one context switch per TLS message.
  93. // In theory it, could help as reading from the network could be done parallel with TLS decryption.
  94. // However, if decrypting data is done faster than data is coming on the network, waiting for data longer and letting SpinWait to go deep-sleep it's going to
  95. // resume the thread milliseconds after new data is available. Those little afters are adding up and actually slowing down the download.
  96. // Not using locking just calling TryDequeue until there's data would solve the slow-down, but with the price of using 100% CPU of a core.
  97. // The whole struggle might worth it if Unity would implement SocketAsyncEventArgs properly.
  98. //sealed class BufferedReadNetworkStream : Stream
  99. //{
  100. // public override bool CanRead => throw new NotImplementedException();
  101. //
  102. // public override bool CanSeek => throw new NotImplementedException();
  103. //
  104. // public override bool CanWrite => throw new NotImplementedException();
  105. //
  106. // public override long Length => throw new NotImplementedException();
  107. //
  108. // public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
  109. //
  110. // byte[] buf;
  111. // int available = 0;
  112. // int pos = 0;
  113. //
  114. // private System.Net.Sockets.Socket client;
  115. // int readBufferSize;
  116. // int bufferSize;
  117. // private System.Threading.SpinWait spinWait = new System.Threading.SpinWait();
  118. //
  119. // System.Collections.Concurrent.ConcurrentQueue<BufferSegment> downloadedData = new System.Collections.Concurrent.ConcurrentQueue<BufferSegment>();
  120. // private int downloadedBytes;
  121. // private System.Threading.SpinWait downWait = new System.Threading.SpinWait();
  122. // private int closed = 0;
  123. //
  124. // //System.Net.Sockets.SocketAsyncEventArgs socketAsyncEventArgs = new System.Net.Sockets.SocketAsyncEventArgs();
  125. //
  126. // //DateTime started;
  127. //
  128. // public BufferedReadNetworkStream(System.Net.Sockets.Socket socket, int readBufferSize, int bufferSize)
  129. // {
  130. // this.client = socket;
  131. // this.readBufferSize = readBufferSize;
  132. // this.bufferSize = bufferSize;
  133. //
  134. // //this.socketAsyncEventArgs.AcceptSocket = this.client;
  135. // //
  136. // //var buffer = BufferPool.Get(this.readBufferSize, true);
  137. // //this.socketAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length);
  138. // //
  139. // ////var bufferList = new List<ArraySegment<byte>>();
  140. // ////for (int i = 0; i < 1; i++)
  141. // ////{
  142. // //// var buffer = BufferPool.Get(this.readBufferSize, true);
  143. // //// bufferList.Add(new ArraySegment<byte>(buffer));
  144. // ////}
  145. // ////this.socketAsyncEventArgs.BufferList = bufferList;
  146. // //
  147. // //this.socketAsyncEventArgs.Completed += SocketAsyncEventArgs_Completed;
  148. // //
  149. // //this.started = DateTime.Now;
  150. // //if (!this.client.ReceiveAsync(this.socketAsyncEventArgs))
  151. // // SocketAsyncEventArgs_Completed(null, this.socketAsyncEventArgs);
  152. //
  153. // BestHTTP.PlatformSupport.Threading.ThreadedRunner.RunShortLiving(() =>
  154. // {
  155. // DateTime started = DateTime.Now;
  156. // try
  157. // {
  158. // while (closed == 0)
  159. // {
  160. // var buffer = BufferPool.Get(this.readBufferSize, true);
  161. //
  162. // int count = this.client.Receive(buffer, 0, buffer.Length, System.Net.Sockets.SocketFlags.None);
  163. // //int count = 0;
  164. // //unsafe {
  165. // // fixed (byte* pBuffer = buffer)
  166. // // {
  167. // // int zero = 0;
  168. // // count = recvfrom(this.client.Handle, pBuffer, buffer.Length, SocketFlags.None, null, ref zero);
  169. // // }
  170. // //}
  171. //
  172. // this.downloadedData.Enqueue(new BufferSegment(buffer, 0, count));
  173. // System.Threading.Interlocked.Add(ref downloadedBytes, count);
  174. //
  175. // if (HTTPManager.Logger.Level <= Logger.Loglevels.Warning)
  176. // HTTPManager.Logger.Warning(nameof(BufferedReadNetworkStream), $"read count: {count:N0} downloadedBytes: {downloadedBytes:N0} / {this.bufferSize:N0}");
  177. //
  178. // if (count <= 0)
  179. // {
  180. // System.Threading.Interlocked.Exchange(ref closed, 1);
  181. // return;
  182. // }
  183. //
  184. // while (downloadedBytes >= this.bufferSize)
  185. // {
  186. // downWait.SpinOnce();
  187. // }
  188. // }
  189. // }
  190. // catch (Exception ex)
  191. // {
  192. // UnityEngine.Debug.LogException(ex);
  193. // }
  194. // finally
  195. // {
  196. // UnityEngine.Debug.Log($"Reading finished in {(DateTime.Now - started)}");
  197. // }
  198. // });
  199. // }
  200. //
  201. // //private void SocketAsyncEventArgs_Completed(object sender, System.Net.Sockets.SocketAsyncEventArgs e)
  202. // //{
  203. // // this.downloadedData.Enqueue(new BufferSegment(e.Buffer, 0, e.BytesTransferred));
  204. // //
  205. // // if (e.BytesTransferred == 0)
  206. // // {
  207. // // UnityEngine.Debug.Log($"Reading finished in {(DateTime.Now - started)}");
  208. // // return;
  209. // // }
  210. // //
  211. // // int down = System.Threading.Interlocked.Add(ref downloadedBytes, e.BytesTransferred);
  212. // //
  213. // // if (HTTPManager.Logger.Level <= Logger.Loglevels.Warning)
  214. // // HTTPManager.Logger.Warning(nameof(BufferedReadNetworkStream), $"SocketAsyncEventArgs_Completed - read count: {e.BytesTransferred:N0} downloadedBytes: {down:N0} / {this.bufferSize:N0}");
  215. // //
  216. // // var buffer = BufferPool.Get(this.readBufferSize, true);
  217. // // this.socketAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length);
  218. // //
  219. // // if (!this.client.ReceiveAsync(this.socketAsyncEventArgs))
  220. // // SocketAsyncEventArgs_Completed(null, this.socketAsyncEventArgs);
  221. // //}
  222. //
  223. // private void SwitchBuffers(bool waitForData)
  224. // {
  225. // //HTTPManager.Logger.Error("Read", $"{this.downloadedData.Count}");
  226. // BufferSegment segment;
  227. // while (!this.downloadedData.TryDequeue(out segment))
  228. // {
  229. // if (waitForData && closed == 0)
  230. // {
  231. // if (HTTPManager.Logger.Level <= Logger.Loglevels.Error)
  232. // HTTPManager.Logger.Error(nameof(BufferedReadNetworkStream), $"SpinOnce");
  233. // this.spinWait.SpinOnce();
  234. // }
  235. // else
  236. // return;
  237. // }
  238. //
  239. // //if (segment.Count <= 0)
  240. // // throw new Exception("Connection closed!");
  241. //
  242. // if (buf != null)
  243. // BufferPool.Release(buf);
  244. //
  245. // System.Threading.Interlocked.Add(ref downloadedBytes, -segment.Count);
  246. //
  247. // buf = segment.Data;
  248. // available = segment.Count;
  249. // pos = 0;
  250. // }
  251. //
  252. // public override int Read(byte[] buffer, int offset, int size)
  253. // {
  254. // if (this.buf == null)
  255. // {
  256. // SwitchBuffers(true);
  257. // }
  258. //
  259. // if (size <= available)
  260. // {
  261. // Array.Copy(buf, pos, buffer, offset, size);
  262. // available -= size;
  263. // pos += size;
  264. //
  265. // if (available == 0)
  266. // {
  267. // SwitchBuffers(false);
  268. // }
  269. //
  270. // return size;
  271. // }
  272. // else
  273. // {
  274. // int readcount = 0;
  275. // if (available > 0)
  276. // {
  277. // Array.Copy(buf, pos, buffer, offset, available);
  278. // offset += available;
  279. // readcount += available;
  280. // available = 0;
  281. // pos = 0;
  282. // }
  283. //
  284. // while (true)
  285. // {
  286. // try
  287. // {
  288. // SwitchBuffers(true);
  289. // }
  290. // catch (Exception ex)
  291. // {
  292. // if (readcount > 0)
  293. // {
  294. // return readcount;
  295. // }
  296. //
  297. // throw (ex);
  298. // }
  299. //
  300. // if (available < 1)
  301. // {
  302. // if (readcount > 0)
  303. // {
  304. // return readcount;
  305. // }
  306. //
  307. // return available;
  308. // }
  309. // else
  310. // {
  311. // int toread = size - readcount;
  312. // if (toread <= available)
  313. // {
  314. // Array.Copy(buf, pos, buffer, offset, toread);
  315. // available -= toread;
  316. // pos += toread;
  317. // readcount += toread;
  318. // return readcount;
  319. // }
  320. // else
  321. // {
  322. // Array.Copy(buf, pos, buffer, offset, available);
  323. // offset += available;
  324. // readcount += available;
  325. // pos = 0;
  326. // available = 0;
  327. // }
  328. // }
  329. // }
  330. // }
  331. // }
  332. //
  333. // public override long Seek(long offset, SeekOrigin origin)
  334. // {
  335. // throw new NotImplementedException();
  336. // }
  337. //
  338. // public override void SetLength(long value)
  339. // {
  340. // throw new NotImplementedException();
  341. // }
  342. //
  343. // public override void Write(byte[] buffer, int offset, int count)
  344. // {
  345. // this.client.Send(buffer, offset, count, System.Net.Sockets.SocketFlags.None);
  346. //
  347. // HTTPManager.Logger.Warning(nameof(BufferedReadNetworkStream), $"Wrote: {count}");
  348. // }
  349. //
  350. // public override void Close()
  351. // {
  352. // base.Close();
  353. //
  354. // //socketAsyncEventArgs.Dispose();
  355. // //socketAsyncEventArgs = null;
  356. // }
  357. //
  358. // public override void Flush()
  359. // {
  360. // }
  361. //}
  362. }