/* Copyright 2016-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; using MongoDB.Bson.IO; using MongoDB.Bson.Serialization; using MongoDB.Bson.Serialization.Serializers; using MongoDB.Driver.Core.Bindings; using MongoDB.Driver.Core.Clusters; using MongoDB.Driver.Core.Clusters.ServerSelectors; using MongoDB.Driver.Core.Misc; using MongoDB.Driver.Core.Operations; using MongoDB.Driver.Core.WireProtocol.Messages.Encoders; namespace MongoDB.Driver.GridFS { /// /// Represents a GridFS bucket. /// /// The type of the file identifier. [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] // we can get away with not calling Dispose on our SemaphoreSlim public class GridFSBucket : IGridFSBucket { // fields private readonly ICluster _cluster; private readonly IMongoDatabase _database; private bool _ensureIndexesDone; private SemaphoreSlim _ensureIndexesSemaphore = new SemaphoreSlim(1); private readonly IBsonSerializer> _fileInfoSerializer; private readonly BsonSerializationInfo _idSerializationInfo; private readonly ImmutableGridFSBucketOptions _options; // constructors /// /// Initializes a new instance of the class. /// /// The database. /// The options. public GridFSBucket(IMongoDatabase database, GridFSBucketOptions options = null) { _database = Ensure.IsNotNull(database, nameof(database)); _options = options == null ? ImmutableGridFSBucketOptions.Defaults : new ImmutableGridFSBucketOptions(options); _cluster = database.Client.Cluster; var idSerializer = _options.SerializerRegistry.GetSerializer(); _idSerializationInfo = new BsonSerializationInfo("_id", idSerializer, typeof(TFileId)); _fileInfoSerializer = new GridFSFileInfoSerializer(idSerializer); } // properties /// public IMongoDatabase Database { get { return _database; } } /// public ImmutableGridFSBucketOptions Options { get { return _options; } } // methods /// public void Delete(TFileId id, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); using (var binding = GetSingleServerReadWriteBinding(cancellationToken)) { var filesCollectionDeleteOperation = CreateDeleteFileOperation(id); var filesCollectionDeleteResult = filesCollectionDeleteOperation.Execute(binding, cancellationToken); var chunksDeleteOperation = CreateDeleteChunksOperation(id); chunksDeleteOperation.Execute(binding, cancellationToken); if (filesCollectionDeleteResult.DeletedCount == 0) { throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id)); } } } /// public async Task DeleteAsync(TFileId id, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false)) { var filesCollectionDeleteOperation = CreateDeleteFileOperation(id); var filesCollectionDeleteResult = await filesCollectionDeleteOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); var chunksDeleteOperation = CreateDeleteChunksOperation(id); await chunksDeleteOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); if (filesCollectionDeleteResult.DeletedCount == 0) { throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id)); } } } /// public byte[] DownloadAsBytes(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); options = options ?? new GridFSDownloadOptions(); using (var binding = GetSingleServerReadBinding(cancellationToken)) { var fileInfo = GetFileInfo(binding, id, cancellationToken); return DownloadAsBytesHelper(binding, fileInfo, options, cancellationToken); } } /// public async Task DownloadAsBytesAsync(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); options = options ?? new GridFSDownloadOptions(); using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false)) { var fileInfo = await GetFileInfoAsync(binding, id, cancellationToken).ConfigureAwait(false); return await DownloadAsBytesHelperAsync(binding, fileInfo, options, cancellationToken).ConfigureAwait(false); } } /// public byte[] DownloadAsBytesByName(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filename, nameof(filename)); options = options ?? new GridFSDownloadByNameOptions(); using (var binding = GetSingleServerReadBinding(cancellationToken)) { var fileInfo = GetFileInfoByName(binding, filename, options.Revision, cancellationToken); return DownloadAsBytesHelper(binding, fileInfo, options, cancellationToken); } } /// public async Task DownloadAsBytesByNameAsync(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filename, nameof(filename)); options = options ?? new GridFSDownloadByNameOptions(); using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false)) { var fileInfo = await GetFileInfoByNameAsync(binding, filename, options.Revision, cancellationToken).ConfigureAwait(false); return await DownloadAsBytesHelperAsync(binding, fileInfo, options, cancellationToken).ConfigureAwait(false); } } /// public void DownloadToStream(TFileId id, Stream destination, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(destination, nameof(destination)); options = options ?? new GridFSDownloadOptions(); using (var binding = GetSingleServerReadBinding(cancellationToken)) { var fileInfo = GetFileInfo(binding, id, cancellationToken); DownloadToStreamHelper(binding, fileInfo, destination, options, cancellationToken); } } /// public async Task DownloadToStreamAsync(TFileId id, Stream destination, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(destination, nameof(destination)); options = options ?? new GridFSDownloadOptions(); using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false)) { var fileInfo = await GetFileInfoAsync(binding, id, cancellationToken).ConfigureAwait(false); await DownloadToStreamHelperAsync(binding, fileInfo, destination, options, cancellationToken).ConfigureAwait(false); } } /// public void DownloadToStreamByName(string filename, Stream destination, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filename, nameof(filename)); Ensure.IsNotNull(destination, nameof(destination)); options = options ?? new GridFSDownloadByNameOptions(); using (var binding = GetSingleServerReadBinding(cancellationToken)) { var fileInfo = GetFileInfoByName(binding, filename, options.Revision, cancellationToken); DownloadToStreamHelper(binding, fileInfo, destination, options, cancellationToken); } } /// public async Task DownloadToStreamByNameAsync(string filename, Stream destination, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filename, nameof(filename)); Ensure.IsNotNull(destination, nameof(destination)); options = options ?? new GridFSDownloadByNameOptions(); using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false)) { var fileInfo = await GetFileInfoByNameAsync(binding, filename, options.Revision, cancellationToken).ConfigureAwait(false); await DownloadToStreamHelperAsync(binding, fileInfo, destination, options, cancellationToken).ConfigureAwait(false); } } /// public void Drop(CancellationToken cancellationToken = default(CancellationToken)) { var filesCollectionNamespace = this.GetFilesCollectionNamespace(); var chunksCollectionNamespace = this.GetChunksCollectionNamespace(); var messageEncoderSettings = this.GetMessageEncoderSettings(); using (var binding = GetSingleServerReadWriteBinding(cancellationToken)) { var filesCollectionDropOperation = CreateDropCollectionOperation(filesCollectionNamespace, messageEncoderSettings); filesCollectionDropOperation.Execute(binding, cancellationToken); var chunksCollectionDropOperation = CreateDropCollectionOperation(chunksCollectionNamespace, messageEncoderSettings); chunksCollectionDropOperation.Execute(binding, cancellationToken); } } /// public async Task DropAsync(CancellationToken cancellationToken = default(CancellationToken)) { var filesCollectionNamespace = this.GetFilesCollectionNamespace(); var chunksCollectionNamespace = this.GetChunksCollectionNamespace(); var messageEncoderSettings = this.GetMessageEncoderSettings(); using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false)) { var filesCollectionDropOperation = CreateDropCollectionOperation(filesCollectionNamespace, messageEncoderSettings); await filesCollectionDropOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); var chunksCollectionDropOperation = CreateDropCollectionOperation(chunksCollectionNamespace, messageEncoderSettings); await chunksCollectionDropOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); } } /// public IAsyncCursor> Find(FilterDefinition> filter, GridFSFindOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filter, nameof(filter)); options = options ?? new GridFSFindOptions(); var operation = CreateFindOperation(filter, options); using (var binding = GetSingleServerReadBinding(cancellationToken)) { return operation.Execute(binding, cancellationToken); } } /// public async Task>> FindAsync(FilterDefinition> filter, GridFSFindOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filter, nameof(filter)); options = options ?? new GridFSFindOptions(); var operation = CreateFindOperation(filter, options); using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false)) { return await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); } } /// public GridFSDownloadStream OpenDownloadStream(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); options = options ?? new GridFSDownloadOptions(); using (var binding = GetSingleServerReadBinding(cancellationToken)) { var fileInfo = GetFileInfo(binding, id, cancellationToken); return CreateDownloadStream(binding.Fork(), fileInfo, options, cancellationToken); } } /// public async Task> OpenDownloadStreamAsync(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); options = options ?? new GridFSDownloadOptions(); using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false)) { var fileInfo = await GetFileInfoAsync(binding, id, cancellationToken).ConfigureAwait(false); return CreateDownloadStream(binding.Fork(), fileInfo, options, cancellationToken); } } /// public GridFSDownloadStream OpenDownloadStreamByName(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filename, nameof(filename)); options = options ?? new GridFSDownloadByNameOptions(); using (var binding = GetSingleServerReadBinding(cancellationToken)) { var fileInfo = GetFileInfoByName(binding, filename, options.Revision, cancellationToken); return CreateDownloadStream(binding.Fork(), fileInfo, options); } } /// public async Task> OpenDownloadStreamByNameAsync(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(filename, nameof(filename)); options = options ?? new GridFSDownloadByNameOptions(); using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false)) { var fileInfo = await GetFileInfoByNameAsync(binding, filename, options.Revision, cancellationToken).ConfigureAwait(false); return CreateDownloadStream(binding.Fork(), fileInfo, options); } } /// public GridFSUploadStream OpenUploadStream(TFileId id, string filename, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(filename, nameof(filename)); options = options ?? new GridFSUploadOptions(); using (var binding = GetSingleServerReadWriteBinding(cancellationToken)) { EnsureIndexes(binding, cancellationToken); return CreateUploadStream(binding, id, filename, options); } } /// public async Task> OpenUploadStreamAsync(TFileId id, string filename, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(filename, nameof(filename)); options = options ?? new GridFSUploadOptions(); using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false)) { await EnsureIndexesAsync(binding, cancellationToken).ConfigureAwait(false); return CreateUploadStream(binding, id, filename, options); } } /// public void Rename(TFileId id, string newFilename, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(newFilename, nameof(newFilename)); var renameOperation = CreateRenameOperation(id, newFilename); using (var binding = GetSingleServerReadWriteBinding(cancellationToken)) { var result = renameOperation.Execute(binding, cancellationToken); if (result.IsModifiedCountAvailable && result.ModifiedCount == 0) { throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id)); } } } /// public async Task RenameAsync(TFileId id, string newFilename, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(newFilename, nameof(newFilename)); var renameOperation = CreateRenameOperation(id, newFilename); using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false)) { var result = await renameOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); if (result.IsModifiedCountAvailable && result.ModifiedCount == 0) { throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id)); } } } /// public void UploadFromBytes(TFileId id, string filename, byte[] source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(filename, nameof(filename)); Ensure.IsNotNull(source, nameof(source)); options = options ?? new GridFSUploadOptions(); using (var sourceStream = new MemoryStream(source)) { UploadFromStream(id, filename, sourceStream, options, cancellationToken); } } /// public async Task UploadFromBytesAsync(TFileId id, string filename, byte[] source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(filename, nameof(filename)); Ensure.IsNotNull(source, nameof(source)); options = options ?? new GridFSUploadOptions(); using (var sourceStream = new MemoryStream(source)) { await UploadFromStreamAsync(id, filename, sourceStream, options, cancellationToken).ConfigureAwait(false); } } /// public void UploadFromStream(TFileId id, string filename, Stream source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(filename, nameof(filename)); Ensure.IsNotNull(source, nameof(source)); options = options ?? new GridFSUploadOptions(); using (var destination = OpenUploadStream(id, filename, options, cancellationToken)) { var chunkSizeBytes = options.ChunkSizeBytes ?? _options.ChunkSizeBytes; var buffer = new byte[chunkSizeBytes]; while (true) { int bytesRead = 0; try { bytesRead = source.Read(buffer, 0, buffer.Length); } catch { try { destination.Abort(); } catch { // ignore any exceptions because we're going to rethrow the original exception } throw; } if (bytesRead == 0) { break; } destination.Write(buffer, 0, bytesRead); } destination.Close(cancellationToken); } } /// public async Task UploadFromStreamAsync(TFileId id, string filename, Stream source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull((object)id, nameof(id)); Ensure.IsNotNull(filename, nameof(filename)); Ensure.IsNotNull(source, nameof(source)); options = options ?? new GridFSUploadOptions(); using (var destination = await OpenUploadStreamAsync(id, filename, options, cancellationToken).ConfigureAwait(false)) { var chunkSizeBytes = options.ChunkSizeBytes ?? _options.ChunkSizeBytes; var buffer = new byte[chunkSizeBytes]; while (true) { int bytesRead = 0; Exception sourceException = null; try { bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { // cannot await in the body of a catch clause sourceException = ex; } if (sourceException != null) { try { await destination.AbortAsync().ConfigureAwait(false); } catch { // ignore any exceptions because we're going to rethrow the original exception } throw sourceException; } if (bytesRead == 0) { break; } await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false); } await destination.CloseAsync(cancellationToken).ConfigureAwait(false); } } // private methods private bool ChunksCollectionIndexesExist(List indexes) { var key = new BsonDocument { { "files_id", 1 }, { "n", 1 } }; return IndexExists(indexes, key); } private bool ChunksCollectionIndexesExist(IReadBindingHandle binding, CancellationToken cancellationToken) { var indexes = ListIndexes(binding, this.GetChunksCollectionNamespace(), cancellationToken); return ChunksCollectionIndexesExist(indexes); } private async Task ChunksCollectionIndexesExistAsync(IReadBindingHandle binding, CancellationToken cancellationToken) { var indexes = await ListIndexesAsync(binding, this.GetChunksCollectionNamespace(), cancellationToken).ConfigureAwait(false); return ChunksCollectionIndexesExist(indexes); } private void CreateChunksCollectionIndexes(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { var operation = CreateCreateChunksCollectionIndexesOperation(); operation.Execute(binding, cancellationToken); } private async Task CreateChunksCollectionIndexesAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { var operation = CreateCreateChunksCollectionIndexesOperation(); await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); } internal CreateIndexesOperation CreateCreateChunksCollectionIndexesOperation() { var collectionNamespace = this.GetChunksCollectionNamespace(); var requests = new[] { new CreateIndexRequest(new BsonDocument { { "files_id", 1 }, { "n", 1 } }) { Unique = true } }; var messageEncoderSettings = this.GetMessageEncoderSettings(); return new CreateIndexesOperation(collectionNamespace, requests, messageEncoderSettings) { WriteConcern = _options.WriteConcern ?? _database.Settings.WriteConcern }; } internal CreateIndexesOperation CreateCreateFilesCollectionIndexesOperation() { var collectionNamespace = this.GetFilesCollectionNamespace(); var requests = new[] { new CreateIndexRequest(new BsonDocument { { "filename", 1 }, { "uploadDate", 1 } }) }; var messageEncoderSettings = this.GetMessageEncoderSettings(); return new CreateIndexesOperation(collectionNamespace, requests, messageEncoderSettings) { WriteConcern = _options.WriteConcern ?? _database.Settings.WriteConcern }; } private BulkMixedWriteOperation CreateDeleteChunksOperation(TFileId id) { var filter = new BsonDocument("files_id", _idSerializationInfo.SerializeValue(id)); return new BulkMixedWriteOperation( this.GetChunksCollectionNamespace(), new[] { new DeleteRequest(filter) { Limit = 0 } }, this.GetMessageEncoderSettings()); } private GridFSDownloadStream CreateDownloadStream(IReadBindingHandle binding, GridFSFileInfo fileInfo, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken)) { var checkMD5 = options.CheckMD5 ?? false; var seekable = options.Seekable ?? false; if (checkMD5 && seekable) { throw new ArgumentException("CheckMD5 can only be used when Seekable is false."); } if (seekable) { return new GridFSSeekableDownloadStream(this, binding, fileInfo); } else { return new GridFSForwardOnlyDownloadStream(this, binding, fileInfo, checkMD5); } } internal DropCollectionOperation CreateDropCollectionOperation(CollectionNamespace collectionNamespace, MessageEncoderSettings messageEncoderSettings) { return new DropCollectionOperation(collectionNamespace, messageEncoderSettings) { WriteConcern = _options.WriteConcern ?? _database.Settings.WriteConcern }; } private BulkMixedWriteOperation CreateDeleteFileOperation(TFileId id) { var filter = new BsonDocument("_id", _idSerializationInfo.SerializeValue(id)); return new BulkMixedWriteOperation( this.GetFilesCollectionNamespace(), new[] { new DeleteRequest(filter) }, this.GetMessageEncoderSettings()); } private void CreateFilesCollectionIndexes(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { var operation = CreateCreateFilesCollectionIndexesOperation(); operation.Execute(binding, cancellationToken); } private async Task CreateFilesCollectionIndexesAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { var operation = CreateCreateFilesCollectionIndexesOperation(); await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); } private FindOperation> CreateFindOperation(FilterDefinition> filter, GridFSFindOptions options) { var filesCollectionNamespace = this.GetFilesCollectionNamespace(); var messageEncoderSettings = this.GetMessageEncoderSettings(); var renderedFilter = filter.Render(_fileInfoSerializer, _options.SerializerRegistry); var renderedSort = options.Sort == null ? null : options.Sort.Render(_fileInfoSerializer, _options.SerializerRegistry); return new FindOperation>( filesCollectionNamespace, _fileInfoSerializer, messageEncoderSettings) { BatchSize = options.BatchSize, Filter = renderedFilter, Limit = options.Limit, MaxTime = options.MaxTime, NoCursorTimeout = options.NoCursorTimeout ?? false, ReadConcern = GetReadConcern(), Skip = options.Skip, Sort = renderedSort }; } private FindOperation> CreateGetFileInfoByNameOperation(string filename, int revision) { var collectionNamespace = this.GetFilesCollectionNamespace(); var messageEncoderSettings = this.GetMessageEncoderSettings(); var filter = new BsonDocument("filename", filename); var skip = revision >= 0 ? revision : -revision - 1; var limit = 1; var sort = new BsonDocument("uploadDate", revision >= 0 ? 1 : -1); return new FindOperation>( collectionNamespace, _fileInfoSerializer, messageEncoderSettings) { Filter = filter, Limit = limit, ReadConcern = GetReadConcern(), Skip = skip, Sort = sort }; } private FindOperation> CreateGetFileInfoOperation(TFileId id) { var filesCollectionNamespace = this.GetFilesCollectionNamespace(); var messageEncoderSettings = this.GetMessageEncoderSettings(); var filter = new BsonDocument("_id", _idSerializationInfo.SerializeValue(id)); return new FindOperation>( filesCollectionNamespace, _fileInfoSerializer, messageEncoderSettings) { Filter = filter, Limit = 1, ReadConcern = GetReadConcern(), SingleBatch = true }; } private FindOperation CreateIsFilesCollectionEmptyOperation() { var filesCollectionNamespace = this.GetFilesCollectionNamespace(); var messageEncoderSettings = this.GetMessageEncoderSettings(); return new FindOperation(filesCollectionNamespace, BsonDocumentSerializer.Instance, messageEncoderSettings) { Limit = 1, ReadConcern = GetReadConcern(), SingleBatch = true, Projection = new BsonDocument("_id", 1) }; } private ListIndexesOperation CreateListIndexesOperation(CollectionNamespace collectionNamespace) { var messageEncoderSettings = this.GetMessageEncoderSettings(); return new ListIndexesOperation(collectionNamespace, messageEncoderSettings); } private BulkMixedWriteOperation CreateRenameOperation(TFileId id, string newFilename) { var filesCollectionNamespace = this.GetFilesCollectionNamespace(); var filter = new BsonDocument("_id", _idSerializationInfo.SerializeValue(id)); var update = new BsonDocument("$set", new BsonDocument("filename", newFilename)); var requests = new[] { new UpdateRequest(UpdateType.Update, filter, update) }; var messageEncoderSettings = this.GetMessageEncoderSettings(); return new BulkMixedWriteOperation(filesCollectionNamespace, requests, messageEncoderSettings); } private GridFSUploadStream CreateUploadStream(IReadWriteBindingHandle binding, TFileId id, string filename, GridFSUploadOptions options) { #pragma warning disable 618 var chunkSizeBytes = options.ChunkSizeBytes ?? _options.ChunkSizeBytes; var batchSize = options.BatchSize ?? (16 * 1024 * 1024 / chunkSizeBytes); return new GridFSForwardOnlyUploadStream( this, binding.Fork(), id, filename, options.Metadata, options.Aliases, options.ContentType, chunkSizeBytes, batchSize, options.DisableMD5); #pragma warning restore } private byte[] DownloadAsBytesHelper(IReadBindingHandle binding, GridFSFileInfo fileInfo, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken)) { if (fileInfo.Length > int.MaxValue) { throw new NotSupportedException("GridFS stored file is too large to be returned as a byte array."); } var bytes = new byte[(int)fileInfo.Length]; using (var destination = new MemoryStream(bytes)) { DownloadToStreamHelper(binding, fileInfo, destination, options, cancellationToken); return bytes; } } private async Task DownloadAsBytesHelperAsync(IReadBindingHandle binding, GridFSFileInfo fileInfo, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken)) { if (fileInfo.Length > int.MaxValue) { throw new NotSupportedException("GridFS stored file is too large to be returned as a byte array."); } var bytes = new byte[(int)fileInfo.Length]; using (var destination = new MemoryStream(bytes)) { await DownloadToStreamHelperAsync(binding, fileInfo, destination, options, cancellationToken).ConfigureAwait(false); return bytes; } } private void DownloadToStreamHelper(IReadBindingHandle binding, GridFSFileInfo fileInfo, Stream destination, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken)) { var checkMD5 = options.CheckMD5 ?? false; using (var source = new GridFSForwardOnlyDownloadStream(this, binding.Fork(), fileInfo, checkMD5)) { var count = source.Length; var buffer = new byte[fileInfo.ChunkSizeBytes]; while (count > 0) { var partialCount = (int)Math.Min(buffer.Length, count); source.ReadBytes(buffer, 0, partialCount, cancellationToken); //((Stream)source).ReadBytes(buffer, 0, partialCount, cancellationToken); destination.Write(buffer, 0, partialCount); count -= partialCount; } } } private async Task DownloadToStreamHelperAsync(IReadBindingHandle binding, GridFSFileInfo fileInfo, Stream destination, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken)) { var checkMD5 = options.CheckMD5 ?? false; using (var source = new GridFSForwardOnlyDownloadStream(this, binding.Fork(), fileInfo, checkMD5)) { var count = source.Length; var buffer = new byte[fileInfo.ChunkSizeBytes]; while (count > 0) { var partialCount = (int)Math.Min(buffer.Length, count); await source.ReadBytesAsync(buffer, 0, partialCount, cancellationToken).ConfigureAwait(false); await destination.WriteAsync(buffer, 0, partialCount, cancellationToken).ConfigureAwait(false); count -= partialCount; } await source.CloseAsync(cancellationToken).ConfigureAwait(false); } } private void EnsureIndexes(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { _ensureIndexesSemaphore.Wait(cancellationToken); try { if (!_ensureIndexesDone) { var isFilesCollectionEmpty = IsFilesCollectionEmpty(binding, cancellationToken); if (isFilesCollectionEmpty) { if (!FilesCollectionIndexesExist(binding, cancellationToken)) { CreateFilesCollectionIndexes(binding, cancellationToken); } if (!ChunksCollectionIndexesExist(binding, cancellationToken)) { CreateChunksCollectionIndexes(binding, cancellationToken); } } _ensureIndexesDone = true; } } finally { _ensureIndexesSemaphore.Release(); } } private async Task EnsureIndexesAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { await _ensureIndexesSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (!_ensureIndexesDone) { var isFilesCollectionEmpty = await IsFilesCollectionEmptyAsync(binding, cancellationToken).ConfigureAwait(false); if (isFilesCollectionEmpty) { if (!(await FilesCollectionIndexesExistAsync(binding, cancellationToken).ConfigureAwait(false))) { await CreateFilesCollectionIndexesAsync(binding, cancellationToken).ConfigureAwait(false); } if (!(await ChunksCollectionIndexesExistAsync(binding, cancellationToken).ConfigureAwait(false))) { await CreateChunksCollectionIndexesAsync(binding, cancellationToken).ConfigureAwait(false); } } _ensureIndexesDone = true; } } finally { _ensureIndexesSemaphore.Release(); } } private bool FilesCollectionIndexesExist(List indexes) { var key = new BsonDocument { { "filename", 1 }, { "uploadDate", 1 } }; return IndexExists(indexes, key); } private bool FilesCollectionIndexesExist(IReadBindingHandle binding, CancellationToken cancellationToken) { var indexes = ListIndexes(binding, this.GetFilesCollectionNamespace(), cancellationToken); return FilesCollectionIndexesExist(indexes); } private async Task FilesCollectionIndexesExistAsync(IReadBindingHandle binding, CancellationToken cancellationToken) { var indexes = await ListIndexesAsync(binding, this.GetFilesCollectionNamespace(), cancellationToken).ConfigureAwait(false); return FilesCollectionIndexesExist(indexes); } private GridFSFileInfo GetFileInfo(IReadBindingHandle binding, TFileId id, CancellationToken cancellationToken) { var operation = CreateGetFileInfoOperation(id); using (var cursor = operation.Execute(binding, cancellationToken)) { var fileInfo = cursor.FirstOrDefault(cancellationToken); if (fileInfo == null) { throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id)); } return fileInfo; } } private async Task> GetFileInfoAsync(IReadBindingHandle binding, TFileId id, CancellationToken cancellationToken) { var operation = CreateGetFileInfoOperation(id); using (var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false)) { var fileInfo = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false); if (fileInfo == null) { throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id)); } return fileInfo; } } private GridFSFileInfo GetFileInfoByName(IReadBindingHandle binding, string filename, int revision, CancellationToken cancellationToken) { var operation = CreateGetFileInfoByNameOperation(filename, revision); using (var cursor = operation.Execute(binding, cancellationToken)) { var fileInfo = cursor.FirstOrDefault(cancellationToken); if (fileInfo == null) { throw new GridFSFileNotFoundException(filename, revision); } return fileInfo; } } private async Task> GetFileInfoByNameAsync(IReadBindingHandle binding, string filename, int revision, CancellationToken cancellationToken) { var operation = CreateGetFileInfoByNameOperation(filename, revision); using (var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false)) { var fileInfo = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false); if (fileInfo == null) { throw new GridFSFileNotFoundException(filename, revision); } return fileInfo; } } private ReadConcern GetReadConcern() { return _options.ReadConcern ?? _database.Settings.ReadConcern; } private IReadBindingHandle GetSingleServerReadBinding(CancellationToken cancellationToken) { var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference; var selector = new ReadPreferenceServerSelector(readPreference); var server = _cluster.SelectServer(selector, cancellationToken); var binding = new SingleServerReadBinding(server, readPreference, NoCoreSession.NewHandle()); return new ReadBindingHandle(binding); } private async Task GetSingleServerReadBindingAsync(CancellationToken cancellationToken) { var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference; var selector = new ReadPreferenceServerSelector(readPreference); var server = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false); var binding = new SingleServerReadBinding(server, readPreference, NoCoreSession.NewHandle()); return new ReadBindingHandle(binding); } private IReadWriteBindingHandle GetSingleServerReadWriteBinding(CancellationToken cancellationToken) { var selector = WritableServerSelector.Instance; var server = _cluster.SelectServer(selector, cancellationToken); var binding = new SingleServerReadWriteBinding(server, NoCoreSession.NewHandle()); return new ReadWriteBindingHandle(binding); } private async Task GetSingleServerReadWriteBindingAsync(CancellationToken cancellationToken) { var selector = WritableServerSelector.Instance; var server = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false); var binding = new SingleServerReadWriteBinding(server, NoCoreSession.NewHandle()); return new ReadWriteBindingHandle(binding); } private bool IndexExists(List indexes, BsonDocument key) { foreach (var index in indexes) { if (index["key"].Equals(key)) { return true; } } return false; } private bool IsFilesCollectionEmpty(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { var operation = CreateIsFilesCollectionEmptyOperation(); using (var cursor = operation.Execute(binding, cancellationToken)) { var firstOrDefault = cursor.FirstOrDefault(cancellationToken); return firstOrDefault == null; } } private async Task IsFilesCollectionEmptyAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken) { var operation = CreateIsFilesCollectionEmptyOperation(); using (var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false)) { var firstOrDefault = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false); return firstOrDefault == null; } } private List ListIndexes(IReadBinding binding, CollectionNamespace collectionNamespace, CancellationToken cancellationToken) { var operation = CreateListIndexesOperation(collectionNamespace); return operation.Execute(binding, cancellationToken).ToList(); } private async Task> ListIndexesAsync(IReadBinding binding, CollectionNamespace collectionNamespace, CancellationToken cancellationToken) { var operation = CreateListIndexesOperation(collectionNamespace); var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false); return await cursor.ToListAsync(cancellationToken).ConfigureAwait(false); } } }