GridFSSeekableDownloadStream.cs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. /* Copyright 2015-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.IO;
  18. using System.Linq;
  19. using System.Text;
  20. using System.Threading;
  21. using System.Threading.Tasks;
  22. using MongoDB.Bson;
  23. using MongoDB.Bson.Serialization;
  24. using MongoDB.Bson.Serialization.Serializers;
  25. using MongoDB.Driver.Core.Bindings;
  26. using MongoDB.Driver.Core.Misc;
  27. using MongoDB.Driver.Core.Operations;
  28. namespace MongoDB.Driver.GridFS
  29. {
  30. internal class GridFSSeekableDownloadStream<TFileId> : GridFSDownloadStreamBase<TFileId>
  31. {
  32. // private fields
  33. private byte[] _chunk;
  34. private readonly BsonValue _idAsBsonValue;
  35. private long _n = -1;
  36. private long _position;
  37. // constructors
  38. public GridFSSeekableDownloadStream(
  39. GridFSBucket<TFileId> bucket,
  40. IReadBinding binding,
  41. GridFSFileInfo<TFileId> fileInfo)
  42. : base(bucket, binding, fileInfo)
  43. {
  44. var idSerializer = bucket.Options.SerializerRegistry.GetSerializer<TFileId>();
  45. var idSerializationInfo = new BsonSerializationInfo("_id", idSerializer, typeof(TFileId));
  46. _idAsBsonValue = idSerializationInfo.SerializeValue(fileInfo.Id);
  47. }
  48. // public properties
  49. public override bool CanSeek
  50. {
  51. get { return true; }
  52. }
  53. public override long Position
  54. {
  55. get
  56. {
  57. return _position;
  58. }
  59. set
  60. {
  61. Ensure.IsGreaterThanOrEqualToZero(value, nameof(value));
  62. _position = value;
  63. }
  64. }
  65. // methods
  66. public override int Read(byte[] buffer, int offset, int count)
  67. {
  68. Ensure.IsNotNull(buffer, nameof(buffer));
  69. Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset));
  70. Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count));
  71. ThrowIfDisposed();
  72. var bytesRead = 0;
  73. while (count > 0 && _position < FileInfo.Length)
  74. {
  75. var segment = GetSegment(CancellationToken.None);
  76. var partialCount = Math.Min(count, segment.Count);
  77. Buffer.BlockCopy(segment.Array, segment.Offset, buffer, offset, partialCount);
  78. bytesRead += partialCount;
  79. offset += partialCount;
  80. count -= partialCount;
  81. _position += partialCount;
  82. }
  83. return bytesRead;
  84. }
  85. public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  86. {
  87. Ensure.IsNotNull(buffer, nameof(buffer));
  88. Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset));
  89. Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count));
  90. ThrowIfDisposed();
  91. var bytesRead = 0;
  92. while (count > 0 && _position < FileInfo.Length)
  93. {
  94. var segment = await GetSegmentAsync(cancellationToken).ConfigureAwait(false);
  95. var partialCount = Math.Min(count, segment.Count);
  96. Buffer.BlockCopy(segment.Array, segment.Offset, buffer, offset, partialCount);
  97. bytesRead += partialCount;
  98. offset += partialCount;
  99. count -= partialCount;
  100. _position += partialCount;
  101. }
  102. return bytesRead;
  103. }
  104. public override long Seek(long offset, SeekOrigin origin)
  105. {
  106. long newPosition;
  107. switch (origin)
  108. {
  109. case SeekOrigin.Begin: newPosition = offset; break;
  110. case SeekOrigin.Current: newPosition = _position + offset; break;
  111. case SeekOrigin.End: newPosition = Length + offset; break;
  112. default: throw new ArgumentException("Invalid origin.", "origin");
  113. }
  114. if (newPosition < 0)
  115. {
  116. throw new IOException("Position must be greater than or equal to zero.");
  117. }
  118. if (newPosition > Length)
  119. {
  120. throw new IOException("Position must be less than or equal to the length of the stream.");
  121. }
  122. Position = newPosition;
  123. return newPosition;
  124. }
  125. // private methods
  126. private FindOperation<BsonDocument> CreateGetChunkOperation(long n)
  127. {
  128. var chunksCollectionNamespace = Bucket.GetChunksCollectionNamespace();
  129. var messageEncoderSettings = Bucket.GetMessageEncoderSettings();
  130. #pragma warning disable 618
  131. var filter = new BsonDocument
  132. {
  133. { "files_id", _idAsBsonValue },
  134. { "n", n }
  135. };
  136. #pragma warning restore
  137. return new FindOperation<BsonDocument>(
  138. chunksCollectionNamespace,
  139. BsonDocumentSerializer.Instance,
  140. messageEncoderSettings)
  141. {
  142. Filter = filter,
  143. Limit = -1
  144. };
  145. }
  146. private void GetChunk(long n, CancellationToken cancellationToken)
  147. {
  148. var operation = CreateGetChunkOperation(n);
  149. using (var cursor = operation.Execute(Binding, cancellationToken))
  150. {
  151. var documents = cursor.ToList();
  152. _chunk = GetChunkHelper(n, documents);
  153. _n = n;
  154. }
  155. }
  156. private async Task GetChunkAsync(long n, CancellationToken cancellationToken)
  157. {
  158. var operation = CreateGetChunkOperation(n);
  159. using (var cursor = await operation.ExecuteAsync(Binding, cancellationToken).ConfigureAwait(false))
  160. {
  161. var documents = await cursor.ToListAsync().ConfigureAwait(false);
  162. _chunk = GetChunkHelper(n, documents);
  163. _n = n;
  164. }
  165. }
  166. private byte[] GetChunkHelper(long n, List<BsonDocument> documents)
  167. {
  168. if (documents.Count == 0)
  169. {
  170. #pragma warning disable 618
  171. throw new GridFSChunkException(_idAsBsonValue, n, "missing");
  172. #pragma warning restore
  173. }
  174. var document = documents[0];
  175. var data = document["data"].AsBsonBinaryData.Bytes;
  176. var chunkSizeBytes = FileInfo.ChunkSizeBytes;
  177. var lastChunk = FileInfo.Length / FileInfo.ChunkSizeBytes;
  178. var expectedChunkSize = n == lastChunk ? FileInfo.Length % chunkSizeBytes : chunkSizeBytes;
  179. if (data.Length != expectedChunkSize)
  180. {
  181. #pragma warning disable 618
  182. throw new GridFSChunkException(_idAsBsonValue, n, "the wrong size");
  183. #pragma warning restore
  184. }
  185. return data;
  186. }
  187. private ArraySegment<byte> GetSegment(CancellationToken cancellationToken)
  188. {
  189. var n = _position / FileInfo.ChunkSizeBytes;
  190. if (_n != n)
  191. {
  192. GetChunk(n, cancellationToken);
  193. }
  194. var segmentOffset = (int)(_position % FileInfo.ChunkSizeBytes);
  195. var segmentCount = _chunk.Length - segmentOffset;
  196. return new ArraySegment<byte>(_chunk, segmentOffset, segmentCount);
  197. }
  198. private async Task<ArraySegment<byte>> GetSegmentAsync(CancellationToken cancellationToken)
  199. {
  200. var n = _position / FileInfo.ChunkSizeBytes;
  201. if (_n != n)
  202. {
  203. await GetChunkAsync(n, cancellationToken).ConfigureAwait(false);
  204. }
  205. var segmentOffset = (int)(_position % FileInfo.ChunkSizeBytes);
  206. var segmentCount = _chunk.Length - segmentOffset;
  207. return new ArraySegment<byte>(_chunk, segmentOffset, segmentCount);
  208. }
  209. }
  210. }