GridFSForwardOnlyDownloadStream.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. /* Copyright 2016-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.IO;
  18. using System.Linq;
  19. using System.Security.Cryptography;
  20. using System.Threading;
  21. using System.Threading.Tasks;
  22. using MongoDB.Bson;
  23. using MongoDB.Bson.Serialization;
  24. using MongoDB.Bson.Serialization.Serializers;
  25. using MongoDB.Driver.Core.Bindings;
  26. using MongoDB.Driver.Core.Misc;
  27. using MongoDB.Driver.Core.Operations;
  28. using MongoDB.Shared;
  29. namespace MongoDB.Driver.GridFS
  30. {
  31. internal class GridFSForwardOnlyDownloadStream<TFileId> : GridFSDownloadStreamBase<TFileId>
  32. {
  33. // private fields
  34. private List<BsonDocument> _batch;
  35. private long _batchPosition;
  36. private readonly bool _checkMD5;
  37. private IAsyncCursor<BsonDocument> _cursor;
  38. private bool _disposed;
  39. private readonly BsonValue _idAsBsonValue;
  40. private readonly int _lastChunkNumber;
  41. private readonly int _lastChunkSize;
  42. private readonly IncrementalMD5 _md5;
  43. private int _nextChunkNumber;
  44. private long _position;
  45. // constructors
  46. public GridFSForwardOnlyDownloadStream(
  47. GridFSBucket<TFileId> bucket,
  48. IReadBinding binding,
  49. GridFSFileInfo<TFileId> fileInfo,
  50. bool checkMD5)
  51. : base(bucket, binding, fileInfo)
  52. {
  53. _checkMD5 = checkMD5;
  54. if (_checkMD5)
  55. {
  56. _md5 = IncrementalMD5.Create();
  57. }
  58. _lastChunkNumber = (int)((fileInfo.Length - 1) / fileInfo.ChunkSizeBytes);
  59. _lastChunkSize = (int)(fileInfo.Length % fileInfo.ChunkSizeBytes);
  60. if (_lastChunkSize == 0)
  61. {
  62. _lastChunkSize = fileInfo.ChunkSizeBytes;
  63. }
  64. var idSerializer = bucket.Options.SerializerRegistry.GetSerializer<TFileId>();
  65. var idSerializationInfo = new BsonSerializationInfo("_id", idSerializer, typeof(TFileId));
  66. _idAsBsonValue = idSerializationInfo.SerializeValue(fileInfo.Id);
  67. }
  68. // public properties
  69. public override bool CanSeek
  70. {
  71. get { return false; }
  72. }
  73. public override long Position
  74. {
  75. get
  76. {
  77. return _position;
  78. }
  79. set
  80. {
  81. throw new NotSupportedException();
  82. }
  83. }
  84. // methods
  85. public override int Read(byte[] buffer, int offset, int count)
  86. {
  87. Ensure.IsNotNull(buffer, nameof(buffer));
  88. Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset));
  89. Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count));
  90. ThrowIfDisposed();
  91. var bytesRead = 0;
  92. while (count > 0 && _position < FileInfo.Length)
  93. {
  94. var segment = GetSegment(CancellationToken.None);
  95. var partialCount = Math.Min(count, segment.Count);
  96. Buffer.BlockCopy(segment.Array, segment.Offset, buffer, offset, partialCount);
  97. bytesRead += partialCount;
  98. offset += partialCount;
  99. count -= partialCount;
  100. _position += partialCount;
  101. }
  102. return bytesRead;
  103. }
  104. public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  105. {
  106. Ensure.IsNotNull(buffer, nameof(buffer));
  107. Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset));
  108. Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count));
  109. ThrowIfDisposed();
  110. var bytesRead = 0;
  111. while (count > 0 && _position < FileInfo.Length)
  112. {
  113. var segment = await GetSegmentAsync(cancellationToken).ConfigureAwait(false);
  114. var partialCount = Math.Min(count, segment.Count);
  115. Buffer.BlockCopy(segment.Array, segment.Offset, buffer, offset, partialCount);
  116. bytesRead += partialCount;
  117. offset += partialCount;
  118. count -= partialCount;
  119. _position += partialCount;
  120. }
  121. return bytesRead;
  122. }
  123. public override long Seek(long offset, SeekOrigin origin)
  124. {
  125. throw new NotSupportedException();
  126. }
  127. // protected methods
  128. protected override void CloseImplementation(CancellationToken cancellationToken)
  129. {
  130. if (_checkMD5 && _position == FileInfo.Length)
  131. {
  132. var md5 = BsonUtils.ToHexString(_md5.GetHashAndReset());
  133. if (!md5.Equals(FileInfo.MD5, StringComparison.OrdinalIgnoreCase))
  134. {
  135. #pragma warning disable 618
  136. throw new GridFSMD5Exception(_idAsBsonValue);
  137. #pragma warning restore
  138. }
  139. }
  140. }
  141. protected override void Dispose(bool disposing)
  142. {
  143. CloseIfNotAlreadyClosedFromDispose(disposing);
  144. if (!_disposed)
  145. {
  146. if (disposing)
  147. {
  148. if (_cursor != null)
  149. {
  150. _cursor.Dispose();
  151. }
  152. if (_md5 != null)
  153. {
  154. _md5.Dispose();
  155. }
  156. }
  157. _disposed = true;
  158. }
  159. base.Dispose(disposing);
  160. }
  161. protected override void ThrowIfDisposed()
  162. {
  163. if (_disposed)
  164. {
  165. throw new ObjectDisposedException(GetType().Name);
  166. }
  167. base.ThrowIfDisposed();
  168. }
  169. // private methods
  170. private FindOperation<BsonDocument> CreateFirstBatchOperation()
  171. {
  172. var chunksCollectionNamespace = Bucket.GetChunksCollectionNamespace();
  173. var messageEncoderSettings = Bucket.GetMessageEncoderSettings();
  174. #pragma warning disable 618
  175. var filter = new BsonDocument("files_id", _idAsBsonValue);
  176. #pragma warning restore
  177. var sort = new BsonDocument("n", 1);
  178. return new FindOperation<BsonDocument>(
  179. chunksCollectionNamespace,
  180. BsonDocumentSerializer.Instance,
  181. messageEncoderSettings)
  182. {
  183. Filter = filter,
  184. Sort = sort
  185. };
  186. }
  187. private void GetFirstBatch(CancellationToken cancellationToken)
  188. {
  189. var operation = CreateFirstBatchOperation();
  190. _cursor = operation.Execute(Binding, cancellationToken);
  191. GetNextBatch(cancellationToken);
  192. }
  193. private async Task GetFirstBatchAsync(CancellationToken cancellationToken)
  194. {
  195. var operation = CreateFirstBatchOperation();
  196. _cursor = await operation.ExecuteAsync(Binding, cancellationToken).ConfigureAwait(false);
  197. await GetNextBatchAsync(cancellationToken).ConfigureAwait(false);
  198. }
  199. private void GetNextBatch(CancellationToken cancellationToken)
  200. {
  201. List<BsonDocument> batch;
  202. do
  203. {
  204. var hasMore = _cursor.MoveNext(cancellationToken);
  205. batch = hasMore ? _cursor.Current.ToList() : null;
  206. }
  207. while (batch != null && batch.Count == 0);
  208. ProcessNextBatch(batch);
  209. }
  210. private async Task GetNextBatchAsync(CancellationToken cancellationToken)
  211. {
  212. List<BsonDocument> batch;
  213. do
  214. {
  215. var hasMore = await _cursor.MoveNextAsync(cancellationToken).ConfigureAwait(false);
  216. batch = hasMore ? _cursor.Current.ToList() : null;
  217. }
  218. while (batch != null && batch.Count == 0);
  219. ProcessNextBatch(batch);
  220. }
  221. private void ProcessNextBatch(List<BsonDocument> batch)
  222. {
  223. if (batch == null)
  224. {
  225. #pragma warning disable 618
  226. throw new GridFSChunkException(_idAsBsonValue, _nextChunkNumber, "missing");
  227. #pragma warning restore
  228. }
  229. var previousBatch = _batch;
  230. _batch = batch;
  231. if (previousBatch != null)
  232. {
  233. _batchPosition += previousBatch.Count * FileInfo.ChunkSizeBytes; ;
  234. }
  235. var lastChunkInBatch = _batch.Last();
  236. if (lastChunkInBatch["n"].ToInt32() == _lastChunkNumber + 1 && lastChunkInBatch["data"].AsBsonBinaryData.Bytes.Length == 0)
  237. {
  238. _batch.RemoveAt(_batch.Count - 1);
  239. }
  240. foreach (var chunk in _batch)
  241. {
  242. var n = chunk["n"].ToInt32();
  243. var bytes = chunk["data"].AsBsonBinaryData.Bytes;
  244. if (n != _nextChunkNumber)
  245. {
  246. #pragma warning disable 618
  247. throw new GridFSChunkException(_idAsBsonValue, _nextChunkNumber, "missing");
  248. #pragma warning restore
  249. }
  250. _nextChunkNumber++;
  251. var expectedChunkSize = n == _lastChunkNumber ? _lastChunkSize : FileInfo.ChunkSizeBytes;
  252. if (bytes.Length != expectedChunkSize)
  253. {
  254. #pragma warning disable 618
  255. throw new GridFSChunkException(_idAsBsonValue, _nextChunkNumber, "the wrong size");
  256. #pragma warning restore
  257. }
  258. if (_checkMD5)
  259. {
  260. _md5.AppendData(bytes, 0, bytes.Length);
  261. }
  262. }
  263. }
  264. private ArraySegment<byte> GetSegment(CancellationToken cancellationToken)
  265. {
  266. var batchIndex = (int)((_position - _batchPosition) / FileInfo.ChunkSizeBytes);
  267. if (_cursor == null)
  268. {
  269. GetFirstBatch(cancellationToken);
  270. }
  271. else if (batchIndex == _batch.Count)
  272. {
  273. GetNextBatch(cancellationToken);
  274. batchIndex = 0;
  275. }
  276. return GetSegmentHelper(batchIndex);
  277. }
  278. private async Task<ArraySegment<byte>> GetSegmentAsync(CancellationToken cancellationToken)
  279. {
  280. var batchIndex = (int)((_position - _batchPosition) / FileInfo.ChunkSizeBytes);
  281. if (_cursor == null)
  282. {
  283. await GetFirstBatchAsync(cancellationToken).ConfigureAwait(false);
  284. }
  285. else if (batchIndex == _batch.Count)
  286. {
  287. await GetNextBatchAsync(cancellationToken).ConfigureAwait(false);
  288. batchIndex = 0;
  289. }
  290. return GetSegmentHelper(batchIndex);
  291. }
  292. private ArraySegment<byte> GetSegmentHelper(int batchIndex)
  293. {
  294. var bytes = _batch[batchIndex]["data"].AsBsonBinaryData.Bytes;
  295. var segmentOffset = (int)(_position % FileInfo.ChunkSizeBytes);
  296. var segmentCount = bytes.Length - segmentOffset;
  297. return new ArraySegment<byte>(bytes, segmentOffset, segmentCount);
  298. }
  299. }
  300. }