GridFSForwardOnlyUploadStream.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. /* Copyright 2015-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using System.Security.Cryptography;
  19. using System.Threading;
  20. using System.Threading.Tasks;
  21. using MongoDB.Bson;
  22. using MongoDB.Bson.Serialization;
  23. using MongoDB.Driver.Core.Bindings;
  24. using MongoDB.Driver.Core.Operations;
  25. using MongoDB.Shared;
  26. namespace MongoDB.Driver.GridFS
  27. {
  28. internal class GridFSForwardOnlyUploadStream<TFileId> : GridFSUploadStream<TFileId>
  29. {
  30. #region static
  31. // private static fields
  32. private static readonly Task __completedTask = Task.FromResult(true);
  33. #endregion
  34. // fields
  35. private bool _aborted;
  36. private readonly List<string> _aliases;
  37. private List<byte[]> _batch;
  38. private long _batchPosition;
  39. private int _batchSize;
  40. private readonly IWriteBinding _binding;
  41. private readonly GridFSBucket<TFileId> _bucket;
  42. private readonly int _chunkSizeBytes;
  43. private bool _closed;
  44. private readonly string _contentType;
  45. private readonly bool _disableMD5;
  46. private bool _disposed;
  47. private readonly string _filename;
  48. private readonly TFileId _id;
  49. private readonly BsonValue _idAsBsonValue;
  50. private long _length;
  51. private readonly IncrementalMD5 _md5;
  52. private readonly BsonDocument _metadata;
  53. // constructors
  54. public GridFSForwardOnlyUploadStream(
  55. GridFSBucket<TFileId> bucket,
  56. IWriteBinding binding,
  57. TFileId id,
  58. string filename,
  59. BsonDocument metadata,
  60. IEnumerable<string> aliases,
  61. string contentType,
  62. int chunkSizeBytes,
  63. int batchSize,
  64. bool disableMD5)
  65. {
  66. _bucket = bucket;
  67. _binding = binding;
  68. _id = id;
  69. _filename = filename;
  70. _metadata = metadata; // can be null
  71. _aliases = aliases == null ? null : aliases.ToList(); // can be null
  72. _contentType = contentType; // can be null
  73. _chunkSizeBytes = chunkSizeBytes;
  74. _batchSize = batchSize;
  75. _batch = new List<byte[]>();
  76. _md5 = disableMD5 ? null : IncrementalMD5.Create();
  77. _disableMD5 = disableMD5;
  78. var idSerializer = bucket.Options.SerializerRegistry.GetSerializer<TFileId>();
  79. var idSerializationInfo = new BsonSerializationInfo("_id", idSerializer, typeof(TFileId));
  80. _idAsBsonValue = idSerializationInfo.SerializeValue(id);
  81. }
  82. // properties
  83. public override bool CanRead
  84. {
  85. get { return false; }
  86. }
  87. public override bool CanSeek
  88. {
  89. get { return false; }
  90. }
  91. public override bool CanWrite
  92. {
  93. get { return true; }
  94. }
  95. public override TFileId Id
  96. {
  97. get { return _id; }
  98. }
  99. public override long Length
  100. {
  101. get { return _length; }
  102. }
  103. public override long Position
  104. {
  105. get
  106. {
  107. return _length;
  108. }
  109. set
  110. {
  111. throw new NotSupportedException();
  112. }
  113. }
  114. // methods
  115. public override void Abort(CancellationToken cancellationToken = default(CancellationToken))
  116. {
  117. if (_aborted)
  118. {
  119. return;
  120. }
  121. ThrowIfClosedOrDisposed();
  122. _aborted = true;
  123. var operation = CreateAbortOperation();
  124. operation.Execute(_binding, cancellationToken);
  125. }
  126. public override async Task AbortAsync(CancellationToken cancellationToken = default(CancellationToken))
  127. {
  128. if (_aborted)
  129. {
  130. return;
  131. }
  132. ThrowIfClosedOrDisposed();
  133. _aborted = true;
  134. var operation = CreateAbortOperation();
  135. await operation.ExecuteAsync(_binding, cancellationToken).ConfigureAwait(false);
  136. }
  137. public override void Close(CancellationToken cancellationToken)
  138. {
  139. try
  140. {
  141. CloseIfNotAlreadyClosed(cancellationToken);
  142. }
  143. finally
  144. {
  145. Dispose();
  146. }
  147. }
  148. public override async Task CloseAsync(CancellationToken cancellationToken = default(CancellationToken))
  149. {
  150. try
  151. {
  152. await CloseIfNotAlreadyClosedAsync(cancellationToken).ConfigureAwait(false);
  153. }
  154. finally
  155. {
  156. Dispose();
  157. }
  158. }
  159. public override void Flush()
  160. {
  161. // do nothing
  162. }
  163. public override Task FlushAsync(CancellationToken cancellationToken)
  164. {
  165. // do nothing
  166. return __completedTask;
  167. }
  168. public override int Read(byte[] buffer, int offset, int count)
  169. {
  170. throw new NotSupportedException();
  171. }
  172. public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  173. {
  174. throw new NotSupportedException();
  175. }
  176. public override long Seek(long offset, System.IO.SeekOrigin origin)
  177. {
  178. throw new NotSupportedException();
  179. }
  180. public override void SetLength(long value)
  181. {
  182. throw new NotSupportedException();
  183. }
  184. public override void Write(byte[] buffer, int offset, int count)
  185. {
  186. ThrowIfAbortedClosedOrDisposed();
  187. while (count > 0)
  188. {
  189. var chunk = GetCurrentChunk(CancellationToken.None);
  190. var partialCount = Math.Min(count, chunk.Count);
  191. Buffer.BlockCopy(buffer, offset, chunk.Array, chunk.Offset, partialCount);
  192. offset += partialCount;
  193. count -= partialCount;
  194. _length += partialCount;
  195. }
  196. }
  197. public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  198. {
  199. ThrowIfAbortedClosedOrDisposed();
  200. while (count > 0)
  201. {
  202. var chunk = await GetCurrentChunkAsync(cancellationToken).ConfigureAwait(false);
  203. var partialCount = Math.Min(count, chunk.Count);
  204. Buffer.BlockCopy(buffer, offset, chunk.Array, chunk.Offset, partialCount);
  205. offset += partialCount;
  206. count -= partialCount;
  207. _length += partialCount;
  208. }
  209. }
  210. // private methods
  211. private void CloseIfNotAlreadyClosed(CancellationToken cancellationToken)
  212. {
  213. if (!_closed)
  214. {
  215. try
  216. {
  217. CloseImplementation(cancellationToken);
  218. }
  219. finally
  220. {
  221. _closed = true;
  222. }
  223. }
  224. }
  225. private async Task CloseIfNotAlreadyClosedAsync(CancellationToken cancellationToken)
  226. {
  227. if (!_closed)
  228. {
  229. try
  230. {
  231. await CloseImplementationAsync(cancellationToken).ConfigureAwait(false);
  232. }
  233. finally
  234. {
  235. _closed = true;
  236. }
  237. }
  238. }
  239. private void CloseIfNotAlreadyClosedFromDispose(bool disposing)
  240. {
  241. if (disposing)
  242. {
  243. try
  244. {
  245. CloseIfNotAlreadyClosed(CancellationToken.None);
  246. }
  247. catch
  248. {
  249. // ignore any exceptions from CloseIfNotAlreadyClosed when called from Dispose
  250. }
  251. }
  252. }
  253. private void CloseImplementation(CancellationToken cancellationToken)
  254. {
  255. if (!_aborted)
  256. {
  257. WriteFinalBatch(cancellationToken);
  258. WriteFilesCollectionDocument(cancellationToken);
  259. }
  260. }
  261. private async Task CloseImplementationAsync(CancellationToken cancellationToken = default(CancellationToken))
  262. {
  263. if (!_aborted)
  264. {
  265. await WriteFinalBatchAsync(cancellationToken).ConfigureAwait(false);
  266. await WriteFilesCollectionDocumentAsync(cancellationToken).ConfigureAwait(false);
  267. }
  268. }
  269. private BulkMixedWriteOperation CreateAbortOperation()
  270. {
  271. var chunksCollectionNamespace = _bucket.GetChunksCollectionNamespace();
  272. var filter = new BsonDocument("files_id", _idAsBsonValue);
  273. var deleteRequest = new DeleteRequest(filter) { Limit = 0 };
  274. var requests = new WriteRequest[] { deleteRequest };
  275. var messageEncoderSettings = _bucket.GetMessageEncoderSettings();
  276. return new BulkMixedWriteOperation(chunksCollectionNamespace, requests, messageEncoderSettings)
  277. {
  278. WriteConcern = _bucket.Options.WriteConcern
  279. };
  280. }
  281. private BsonDocument CreateFilesCollectionDocument()
  282. {
  283. var uploadDateTime = DateTime.UtcNow;
  284. return new BsonDocument
  285. {
  286. { "_id", _idAsBsonValue },
  287. { "length", _length },
  288. { "chunkSize", _chunkSizeBytes },
  289. { "uploadDate", uploadDateTime },
  290. { "md5", () => BsonUtils.ToHexString(_md5.GetHashAndReset()), !_disableMD5 },
  291. { "filename", _filename },
  292. { "contentType", _contentType, _contentType != null },
  293. { "aliases", () => new BsonArray(_aliases.Select(a => new BsonString(a))), _aliases != null },
  294. { "metadata", _metadata, _metadata != null }
  295. };
  296. }
  297. private IEnumerable<BsonDocument> CreateWriteBatchChunkDocuments()
  298. {
  299. var chunkDocuments = new List<BsonDocument>();
  300. var n = (int)(_batchPosition / _chunkSizeBytes);
  301. foreach (var chunk in _batch)
  302. {
  303. var chunkDocument = new BsonDocument
  304. {
  305. { "_id", ObjectId.GenerateNewId() },
  306. { "files_id", _idAsBsonValue },
  307. { "n", n++ },
  308. { "data", new BsonBinaryData(chunk, BsonBinarySubType.Binary) }
  309. };
  310. chunkDocuments.Add(chunkDocument);
  311. _batchPosition += chunk.Length;
  312. _md5?.AppendData(chunk, 0, chunk.Length);
  313. }
  314. return chunkDocuments;
  315. }
  316. protected override void Dispose(bool disposing)
  317. {
  318. CloseIfNotAlreadyClosedFromDispose(disposing);
  319. if (!_disposed)
  320. {
  321. _disposed = true;
  322. if (disposing)
  323. {
  324. if (_md5 != null)
  325. {
  326. _md5.Dispose();
  327. }
  328. _binding.Dispose();
  329. }
  330. }
  331. base.Dispose(disposing);
  332. }
  333. private IMongoCollection<BsonDocument> GetChunksCollection()
  334. {
  335. return GetCollection("chunks");
  336. }
  337. private IMongoCollection<BsonDocument> GetCollection(string suffix)
  338. {
  339. var database = _bucket.Database;
  340. var collectionName = _bucket.Options.BucketName + "." + suffix;
  341. var writeConcern = _bucket.Options.WriteConcern ?? database.Settings.WriteConcern;
  342. var settings = new MongoCollectionSettings { WriteConcern = writeConcern };
  343. return database.GetCollection<BsonDocument>(collectionName, settings);
  344. }
  345. private ArraySegment<byte> GetCurrentChunk(CancellationToken cancellationToken)
  346. {
  347. var batchIndex = (int)((_length - _batchPosition) / _chunkSizeBytes);
  348. if (batchIndex == _batchSize)
  349. {
  350. WriteBatch(cancellationToken);
  351. _batch.Clear();
  352. batchIndex = 0;
  353. }
  354. return GetCurrentChunkSegment(batchIndex);
  355. }
  356. private async Task<ArraySegment<byte>> GetCurrentChunkAsync(CancellationToken cancellationToken)
  357. {
  358. var batchIndex = (int)((_length - _batchPosition) / _chunkSizeBytes);
  359. if (batchIndex == _batchSize)
  360. {
  361. await WriteBatchAsync(cancellationToken).ConfigureAwait(false);
  362. _batch.Clear();
  363. batchIndex = 0;
  364. }
  365. return GetCurrentChunkSegment(batchIndex);
  366. }
  367. private ArraySegment<byte> GetCurrentChunkSegment(int batchIndex)
  368. {
  369. if (_batch.Count <= batchIndex)
  370. {
  371. _batch.Add(new byte[_chunkSizeBytes]);
  372. }
  373. var chunk = _batch[batchIndex];
  374. var offset = (int)(_length % _chunkSizeBytes);
  375. var count = _chunkSizeBytes - offset;
  376. return new ArraySegment<byte>(chunk, offset, count);
  377. }
  378. private IMongoCollection<BsonDocument> GetFilesCollection()
  379. {
  380. return GetCollection("files");
  381. }
  382. private void ThrowIfAbortedClosedOrDisposed()
  383. {
  384. if (_aborted)
  385. {
  386. throw new InvalidOperationException("The upload was aborted.");
  387. }
  388. ThrowIfClosedOrDisposed();
  389. }
  390. private void ThrowIfClosedOrDisposed()
  391. {
  392. if (_closed)
  393. {
  394. throw new InvalidOperationException("The stream is closed.");
  395. }
  396. ThrowIfDisposed();
  397. }
  398. private void ThrowIfDisposed()
  399. {
  400. if (_disposed)
  401. {
  402. throw new ObjectDisposedException(GetType().Name);
  403. }
  404. }
  405. private void TruncateFinalChunk()
  406. {
  407. var finalChunkSize = (int)(_length % _chunkSizeBytes);
  408. if (finalChunkSize > 0)
  409. {
  410. var finalChunk = _batch[_batch.Count - 1];
  411. if (finalChunk.Length != finalChunkSize)
  412. {
  413. var truncatedFinalChunk = new byte[finalChunkSize];
  414. Buffer.BlockCopy(finalChunk, 0, truncatedFinalChunk, 0, finalChunkSize);
  415. _batch[_batch.Count - 1] = truncatedFinalChunk;
  416. }
  417. }
  418. }
  419. private void WriteBatch(CancellationToken cancellationToken)
  420. {
  421. var chunksCollection = GetChunksCollection();
  422. var chunkDocuments = CreateWriteBatchChunkDocuments();
  423. chunksCollection.InsertMany(chunkDocuments, cancellationToken: cancellationToken);
  424. _batch.Clear();
  425. }
  426. private async Task WriteBatchAsync(CancellationToken cancellationToken)
  427. {
  428. var chunksCollection = GetChunksCollection();
  429. var chunkDocuments = CreateWriteBatchChunkDocuments();
  430. await chunksCollection.InsertManyAsync(chunkDocuments, cancellationToken: cancellationToken).ConfigureAwait(false);
  431. _batch.Clear();
  432. }
  433. private void WriteFilesCollectionDocument(CancellationToken cancellationToken)
  434. {
  435. var filesCollection = GetFilesCollection();
  436. var filesCollectionDocument = CreateFilesCollectionDocument();
  437. filesCollection.InsertOne(filesCollectionDocument, cancellationToken: cancellationToken);
  438. }
  439. private async Task WriteFilesCollectionDocumentAsync(CancellationToken cancellationToken)
  440. {
  441. var filesCollection = GetFilesCollection();
  442. var filesCollectionDocument = CreateFilesCollectionDocument();
  443. await filesCollection.InsertOneAsync(filesCollectionDocument, cancellationToken: cancellationToken).ConfigureAwait(false);
  444. }
  445. private void WriteFinalBatch(CancellationToken cancellationToken)
  446. {
  447. if (_batch.Count > 0)
  448. {
  449. TruncateFinalChunk();
  450. WriteBatch(cancellationToken);
  451. }
  452. }
  453. private async Task WriteFinalBatchAsync(CancellationToken cancellationToken)
  454. {
  455. if (_batch.Count > 0)
  456. {
  457. TruncateFinalChunk();
  458. await WriteBatchAsync(cancellationToken).ConfigureAwait(false);
  459. }
  460. }
  461. }
  462. }