| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027 |
- /* Copyright 2016-present MongoDB Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- using System;
- using System.Collections.Generic;
- using System.Diagnostics.CodeAnalysis;
- using System.IO;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using MongoDB.Bson;
- using MongoDB.Bson.IO;
- using MongoDB.Bson.Serialization;
- using MongoDB.Bson.Serialization.Serializers;
- using MongoDB.Driver.Core.Bindings;
- using MongoDB.Driver.Core.Clusters;
- using MongoDB.Driver.Core.Clusters.ServerSelectors;
- using MongoDB.Driver.Core.Misc;
- using MongoDB.Driver.Core.Operations;
- using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
- namespace MongoDB.Driver.GridFS
- {
- /// <summary>
- /// Represents a GridFS bucket.
- /// </summary>
- /// <typeparam name="TFileId">The type of the file identifier.</typeparam>
- [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] // we can get away with not calling Dispose on our SemaphoreSlim
- public class GridFSBucket<TFileId> : IGridFSBucket<TFileId>
- {
- // fields
- private readonly ICluster _cluster;
- private readonly IMongoDatabase _database;
- private bool _ensureIndexesDone;
- private SemaphoreSlim _ensureIndexesSemaphore = new SemaphoreSlim(1);
- private readonly IBsonSerializer<GridFSFileInfo<TFileId>> _fileInfoSerializer;
- private readonly BsonSerializationInfo _idSerializationInfo;
- private readonly ImmutableGridFSBucketOptions _options;
- // constructors
- /// <summary>
- /// Initializes a new instance of the <see cref="GridFSBucket" /> class.
- /// </summary>
- /// <param name="database">The database.</param>
- /// <param name="options">The options.</param>
- public GridFSBucket(IMongoDatabase database, GridFSBucketOptions options = null)
- {
- _database = Ensure.IsNotNull(database, nameof(database));
- _options = options == null ? ImmutableGridFSBucketOptions.Defaults : new ImmutableGridFSBucketOptions(options);
- _cluster = database.Client.Cluster;
- var idSerializer = _options.SerializerRegistry.GetSerializer<TFileId>();
- _idSerializationInfo = new BsonSerializationInfo("_id", idSerializer, typeof(TFileId));
- _fileInfoSerializer = new GridFSFileInfoSerializer<TFileId>(idSerializer);
- }
- // properties
- /// <inheritdoc />
- public IMongoDatabase Database
- {
- get { return _database; }
- }
- /// <inheritdoc />
- public ImmutableGridFSBucketOptions Options
- {
- get { return _options; }
- }
- // methods
- /// <inheritdoc />
- public void Delete(TFileId id, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- using (var binding = GetSingleServerReadWriteBinding(cancellationToken))
- {
- var filesCollectionDeleteOperation = CreateDeleteFileOperation(id);
- var filesCollectionDeleteResult = filesCollectionDeleteOperation.Execute(binding, cancellationToken);
- var chunksDeleteOperation = CreateDeleteChunksOperation(id);
- chunksDeleteOperation.Execute(binding, cancellationToken);
- if (filesCollectionDeleteResult.DeletedCount == 0)
- {
- throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id));
- }
- }
- }
- /// <inheritdoc />
- public async Task DeleteAsync(TFileId id, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var filesCollectionDeleteOperation = CreateDeleteFileOperation(id);
- var filesCollectionDeleteResult = await filesCollectionDeleteOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- var chunksDeleteOperation = CreateDeleteChunksOperation(id);
- await chunksDeleteOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- if (filesCollectionDeleteResult.DeletedCount == 0)
- {
- throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id));
- }
- }
- }
- /// <inheritdoc />
- public byte[] DownloadAsBytes(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- options = options ?? new GridFSDownloadOptions();
- using (var binding = GetSingleServerReadBinding(cancellationToken))
- {
- var fileInfo = GetFileInfo(binding, id, cancellationToken);
- return DownloadAsBytesHelper(binding, fileInfo, options, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task<byte[]> DownloadAsBytesAsync(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- options = options ?? new GridFSDownloadOptions();
- using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await GetFileInfoAsync(binding, id, cancellationToken).ConfigureAwait(false);
- return await DownloadAsBytesHelperAsync(binding, fileInfo, options, cancellationToken).ConfigureAwait(false);
- }
- }
- /// <inheritdoc />
- public byte[] DownloadAsBytesByName(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filename, nameof(filename));
- options = options ?? new GridFSDownloadByNameOptions();
- using (var binding = GetSingleServerReadBinding(cancellationToken))
- {
- var fileInfo = GetFileInfoByName(binding, filename, options.Revision, cancellationToken);
- return DownloadAsBytesHelper(binding, fileInfo, options, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task<byte[]> DownloadAsBytesByNameAsync(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filename, nameof(filename));
- options = options ?? new GridFSDownloadByNameOptions();
- using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await GetFileInfoByNameAsync(binding, filename, options.Revision, cancellationToken).ConfigureAwait(false);
- return await DownloadAsBytesHelperAsync(binding, fileInfo, options, cancellationToken).ConfigureAwait(false);
- }
- }
- /// <inheritdoc />
- public void DownloadToStream(TFileId id, Stream destination, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(destination, nameof(destination));
- options = options ?? new GridFSDownloadOptions();
- using (var binding = GetSingleServerReadBinding(cancellationToken))
- {
- var fileInfo = GetFileInfo(binding, id, cancellationToken);
- DownloadToStreamHelper(binding, fileInfo, destination, options, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task DownloadToStreamAsync(TFileId id, Stream destination, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(destination, nameof(destination));
- options = options ?? new GridFSDownloadOptions();
- using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await GetFileInfoAsync(binding, id, cancellationToken).ConfigureAwait(false);
- await DownloadToStreamHelperAsync(binding, fileInfo, destination, options, cancellationToken).ConfigureAwait(false);
- }
- }
- /// <inheritdoc />
- public void DownloadToStreamByName(string filename, Stream destination, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filename, nameof(filename));
- Ensure.IsNotNull(destination, nameof(destination));
- options = options ?? new GridFSDownloadByNameOptions();
- using (var binding = GetSingleServerReadBinding(cancellationToken))
- {
- var fileInfo = GetFileInfoByName(binding, filename, options.Revision, cancellationToken);
- DownloadToStreamHelper(binding, fileInfo, destination, options, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task DownloadToStreamByNameAsync(string filename, Stream destination, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filename, nameof(filename));
- Ensure.IsNotNull(destination, nameof(destination));
- options = options ?? new GridFSDownloadByNameOptions();
- using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await GetFileInfoByNameAsync(binding, filename, options.Revision, cancellationToken).ConfigureAwait(false);
- await DownloadToStreamHelperAsync(binding, fileInfo, destination, options, cancellationToken).ConfigureAwait(false);
- }
- }
- /// <inheritdoc />
- public void Drop(CancellationToken cancellationToken = default(CancellationToken))
- {
- var filesCollectionNamespace = this.GetFilesCollectionNamespace();
- var chunksCollectionNamespace = this.GetChunksCollectionNamespace();
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- using (var binding = GetSingleServerReadWriteBinding(cancellationToken))
- {
- var filesCollectionDropOperation = CreateDropCollectionOperation(filesCollectionNamespace, messageEncoderSettings);
- filesCollectionDropOperation.Execute(binding, cancellationToken);
- var chunksCollectionDropOperation = CreateDropCollectionOperation(chunksCollectionNamespace, messageEncoderSettings);
- chunksCollectionDropOperation.Execute(binding, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task DropAsync(CancellationToken cancellationToken = default(CancellationToken))
- {
- var filesCollectionNamespace = this.GetFilesCollectionNamespace();
- var chunksCollectionNamespace = this.GetChunksCollectionNamespace();
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var filesCollectionDropOperation = CreateDropCollectionOperation(filesCollectionNamespace, messageEncoderSettings);
- await filesCollectionDropOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- var chunksCollectionDropOperation = CreateDropCollectionOperation(chunksCollectionNamespace, messageEncoderSettings);
- await chunksCollectionDropOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- }
- }
- /// <inheritdoc />
- public IAsyncCursor<GridFSFileInfo<TFileId>> Find(FilterDefinition<GridFSFileInfo<TFileId>> filter, GridFSFindOptions<TFileId> options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filter, nameof(filter));
- options = options ?? new GridFSFindOptions<TFileId>();
- var operation = CreateFindOperation(filter, options);
- using (var binding = GetSingleServerReadBinding(cancellationToken))
- {
- return operation.Execute(binding, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task<IAsyncCursor<GridFSFileInfo<TFileId>>> FindAsync(FilterDefinition<GridFSFileInfo<TFileId>> filter, GridFSFindOptions<TFileId> options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filter, nameof(filter));
- options = options ?? new GridFSFindOptions<TFileId>();
- var operation = CreateFindOperation(filter, options);
- using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- return await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- }
- }
- /// <inheritdoc />
- public GridFSDownloadStream<TFileId> OpenDownloadStream(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- options = options ?? new GridFSDownloadOptions();
- using (var binding = GetSingleServerReadBinding(cancellationToken))
- {
- var fileInfo = GetFileInfo(binding, id, cancellationToken);
- return CreateDownloadStream(binding.Fork(), fileInfo, options, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task<GridFSDownloadStream<TFileId>> OpenDownloadStreamAsync(TFileId id, GridFSDownloadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- options = options ?? new GridFSDownloadOptions();
- using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await GetFileInfoAsync(binding, id, cancellationToken).ConfigureAwait(false);
- return CreateDownloadStream(binding.Fork(), fileInfo, options, cancellationToken);
- }
- }
- /// <inheritdoc />
- public GridFSDownloadStream<TFileId> OpenDownloadStreamByName(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filename, nameof(filename));
- options = options ?? new GridFSDownloadByNameOptions();
- using (var binding = GetSingleServerReadBinding(cancellationToken))
- {
- var fileInfo = GetFileInfoByName(binding, filename, options.Revision, cancellationToken);
- return CreateDownloadStream(binding.Fork(), fileInfo, options);
- }
- }
- /// <inheritdoc />
- public async Task<GridFSDownloadStream<TFileId>> OpenDownloadStreamByNameAsync(string filename, GridFSDownloadByNameOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(filename, nameof(filename));
- options = options ?? new GridFSDownloadByNameOptions();
- using (var binding = await GetSingleServerReadBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await GetFileInfoByNameAsync(binding, filename, options.Revision, cancellationToken).ConfigureAwait(false);
- return CreateDownloadStream(binding.Fork(), fileInfo, options);
- }
- }
- /// <inheritdoc />
- public GridFSUploadStream<TFileId> OpenUploadStream(TFileId id, string filename, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(filename, nameof(filename));
- options = options ?? new GridFSUploadOptions();
- using (var binding = GetSingleServerReadWriteBinding(cancellationToken))
- {
- EnsureIndexes(binding, cancellationToken);
- return CreateUploadStream(binding, id, filename, options);
- }
- }
- /// <inheritdoc />
- public async Task<GridFSUploadStream<TFileId>> OpenUploadStreamAsync(TFileId id, string filename, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(filename, nameof(filename));
- options = options ?? new GridFSUploadOptions();
- using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- await EnsureIndexesAsync(binding, cancellationToken).ConfigureAwait(false);
- return CreateUploadStream(binding, id, filename, options);
- }
- }
- /// <inheritdoc />
- public void Rename(TFileId id, string newFilename, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(newFilename, nameof(newFilename));
- var renameOperation = CreateRenameOperation(id, newFilename);
- using (var binding = GetSingleServerReadWriteBinding(cancellationToken))
- {
- var result = renameOperation.Execute(binding, cancellationToken);
- if (result.IsModifiedCountAvailable && result.ModifiedCount == 0)
- {
- throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id));
- }
- }
- }
- /// <inheritdoc />
- public async Task RenameAsync(TFileId id, string newFilename, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(newFilename, nameof(newFilename));
- var renameOperation = CreateRenameOperation(id, newFilename);
- using (var binding = await GetSingleServerReadWriteBindingAsync(cancellationToken).ConfigureAwait(false))
- {
- var result = await renameOperation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- if (result.IsModifiedCountAvailable && result.ModifiedCount == 0)
- {
- throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id));
- }
- }
- }
- /// <inheritdoc />
- public void UploadFromBytes(TFileId id, string filename, byte[] source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(filename, nameof(filename));
- Ensure.IsNotNull(source, nameof(source));
- options = options ?? new GridFSUploadOptions();
- using (var sourceStream = new MemoryStream(source))
- {
- UploadFromStream(id, filename, sourceStream, options, cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task UploadFromBytesAsync(TFileId id, string filename, byte[] source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(filename, nameof(filename));
- Ensure.IsNotNull(source, nameof(source));
- options = options ?? new GridFSUploadOptions();
- using (var sourceStream = new MemoryStream(source))
- {
- await UploadFromStreamAsync(id, filename, sourceStream, options, cancellationToken).ConfigureAwait(false);
- }
- }
- /// <inheritdoc />
- public void UploadFromStream(TFileId id, string filename, Stream source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(filename, nameof(filename));
- Ensure.IsNotNull(source, nameof(source));
- options = options ?? new GridFSUploadOptions();
- using (var destination = OpenUploadStream(id, filename, options, cancellationToken))
- {
- var chunkSizeBytes = options.ChunkSizeBytes ?? _options.ChunkSizeBytes;
- var buffer = new byte[chunkSizeBytes];
- while (true)
- {
- int bytesRead = 0;
- try
- {
- bytesRead = source.Read(buffer, 0, buffer.Length);
- }
- catch
- {
- try
- {
- destination.Abort();
- }
- catch
- {
- // ignore any exceptions because we're going to rethrow the original exception
- }
- throw;
- }
- if (bytesRead == 0)
- {
- break;
- }
- destination.Write(buffer, 0, bytesRead);
- }
- destination.Close(cancellationToken);
- }
- }
- /// <inheritdoc />
- public async Task UploadFromStreamAsync(TFileId id, string filename, Stream source, GridFSUploadOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull((object)id, nameof(id));
- Ensure.IsNotNull(filename, nameof(filename));
- Ensure.IsNotNull(source, nameof(source));
- options = options ?? new GridFSUploadOptions();
- using (var destination = await OpenUploadStreamAsync(id, filename, options, cancellationToken).ConfigureAwait(false))
- {
- var chunkSizeBytes = options.ChunkSizeBytes ?? _options.ChunkSizeBytes;
- var buffer = new byte[chunkSizeBytes];
- while (true)
- {
- int bytesRead = 0;
- Exception sourceException = null;
- try
- {
- bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- // cannot await in the body of a catch clause
- sourceException = ex;
- }
- if (sourceException != null)
- {
- try
- {
- await destination.AbortAsync().ConfigureAwait(false);
- }
- catch
- {
- // ignore any exceptions because we're going to rethrow the original exception
- }
- throw sourceException;
- }
- if (bytesRead == 0)
- {
- break;
- }
- await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
- }
- await destination.CloseAsync(cancellationToken).ConfigureAwait(false);
- }
- }
- // private methods
- private bool ChunksCollectionIndexesExist(List<BsonDocument> indexes)
- {
- var key = new BsonDocument { { "files_id", 1 }, { "n", 1 } };
- return IndexExists(indexes, key);
- }
- private bool ChunksCollectionIndexesExist(IReadBindingHandle binding, CancellationToken cancellationToken)
- {
- var indexes = ListIndexes(binding, this.GetChunksCollectionNamespace(), cancellationToken);
- return ChunksCollectionIndexesExist(indexes);
- }
- private async Task<bool> ChunksCollectionIndexesExistAsync(IReadBindingHandle binding, CancellationToken cancellationToken)
- {
- var indexes = await ListIndexesAsync(binding, this.GetChunksCollectionNamespace(), cancellationToken).ConfigureAwait(false);
- return ChunksCollectionIndexesExist(indexes);
- }
- private void CreateChunksCollectionIndexes(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- var operation = CreateCreateChunksCollectionIndexesOperation();
- operation.Execute(binding, cancellationToken);
- }
- private async Task CreateChunksCollectionIndexesAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- var operation = CreateCreateChunksCollectionIndexesOperation();
- await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- }
- internal CreateIndexesOperation CreateCreateChunksCollectionIndexesOperation()
- {
- var collectionNamespace = this.GetChunksCollectionNamespace();
- var requests = new[] { new CreateIndexRequest(new BsonDocument { { "files_id", 1 }, { "n", 1 } }) { Unique = true } };
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- return new CreateIndexesOperation(collectionNamespace, requests, messageEncoderSettings)
- {
- WriteConcern = _options.WriteConcern ?? _database.Settings.WriteConcern
- };
- }
- internal CreateIndexesOperation CreateCreateFilesCollectionIndexesOperation()
- {
- var collectionNamespace = this.GetFilesCollectionNamespace();
- var requests = new[] { new CreateIndexRequest(new BsonDocument { { "filename", 1 }, { "uploadDate", 1 } }) };
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- return new CreateIndexesOperation(collectionNamespace, requests, messageEncoderSettings)
- {
- WriteConcern = _options.WriteConcern ?? _database.Settings.WriteConcern
- };
- }
- private BulkMixedWriteOperation CreateDeleteChunksOperation(TFileId id)
- {
- var filter = new BsonDocument("files_id", _idSerializationInfo.SerializeValue(id));
- return new BulkMixedWriteOperation(
- this.GetChunksCollectionNamespace(),
- new[] { new DeleteRequest(filter) { Limit = 0 } },
- this.GetMessageEncoderSettings());
- }
- private GridFSDownloadStream<TFileId> CreateDownloadStream(IReadBindingHandle binding, GridFSFileInfo<TFileId> fileInfo, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken))
- {
- var checkMD5 = options.CheckMD5 ?? false;
- var seekable = options.Seekable ?? false;
- if (checkMD5 && seekable)
- {
- throw new ArgumentException("CheckMD5 can only be used when Seekable is false.");
- }
- if (seekable)
- {
- return new GridFSSeekableDownloadStream<TFileId>(this, binding, fileInfo);
- }
- else
- {
- return new GridFSForwardOnlyDownloadStream<TFileId>(this, binding, fileInfo, checkMD5);
- }
- }
- internal DropCollectionOperation CreateDropCollectionOperation(CollectionNamespace collectionNamespace, MessageEncoderSettings messageEncoderSettings)
- {
- return new DropCollectionOperation(collectionNamespace, messageEncoderSettings)
- {
- WriteConcern = _options.WriteConcern ?? _database.Settings.WriteConcern
- };
- }
- private BulkMixedWriteOperation CreateDeleteFileOperation(TFileId id)
- {
- var filter = new BsonDocument("_id", _idSerializationInfo.SerializeValue(id));
- return new BulkMixedWriteOperation(
- this.GetFilesCollectionNamespace(),
- new[] { new DeleteRequest(filter) },
- this.GetMessageEncoderSettings());
- }
- private void CreateFilesCollectionIndexes(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- var operation = CreateCreateFilesCollectionIndexesOperation();
- operation.Execute(binding, cancellationToken);
- }
- private async Task CreateFilesCollectionIndexesAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- var operation = CreateCreateFilesCollectionIndexesOperation();
- await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- }
- private FindOperation<GridFSFileInfo<TFileId>> CreateFindOperation(FilterDefinition<GridFSFileInfo<TFileId>> filter, GridFSFindOptions<TFileId> options)
- {
- var filesCollectionNamespace = this.GetFilesCollectionNamespace();
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- var renderedFilter = filter.Render(_fileInfoSerializer, _options.SerializerRegistry);
- var renderedSort = options.Sort == null ? null : options.Sort.Render(_fileInfoSerializer, _options.SerializerRegistry);
- return new FindOperation<GridFSFileInfo<TFileId>>(
- filesCollectionNamespace,
- _fileInfoSerializer,
- messageEncoderSettings)
- {
- BatchSize = options.BatchSize,
- Filter = renderedFilter,
- Limit = options.Limit,
- MaxTime = options.MaxTime,
- NoCursorTimeout = options.NoCursorTimeout ?? false,
- ReadConcern = GetReadConcern(),
- Skip = options.Skip,
- Sort = renderedSort
- };
- }
- private FindOperation<GridFSFileInfo<TFileId>> CreateGetFileInfoByNameOperation(string filename, int revision)
- {
- var collectionNamespace = this.GetFilesCollectionNamespace();
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- var filter = new BsonDocument("filename", filename);
- var skip = revision >= 0 ? revision : -revision - 1;
- var limit = 1;
- var sort = new BsonDocument("uploadDate", revision >= 0 ? 1 : -1);
- return new FindOperation<GridFSFileInfo<TFileId>>(
- collectionNamespace,
- _fileInfoSerializer,
- messageEncoderSettings)
- {
- Filter = filter,
- Limit = limit,
- ReadConcern = GetReadConcern(),
- Skip = skip,
- Sort = sort
- };
- }
- private FindOperation<GridFSFileInfo<TFileId>> CreateGetFileInfoOperation(TFileId id)
- {
- var filesCollectionNamespace = this.GetFilesCollectionNamespace();
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- var filter = new BsonDocument("_id", _idSerializationInfo.SerializeValue(id));
- return new FindOperation<GridFSFileInfo<TFileId>>(
- filesCollectionNamespace,
- _fileInfoSerializer,
- messageEncoderSettings)
- {
- Filter = filter,
- Limit = 1,
- ReadConcern = GetReadConcern(),
- SingleBatch = true
- };
- }
- private FindOperation<BsonDocument> CreateIsFilesCollectionEmptyOperation()
- {
- var filesCollectionNamespace = this.GetFilesCollectionNamespace();
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- return new FindOperation<BsonDocument>(filesCollectionNamespace, BsonDocumentSerializer.Instance, messageEncoderSettings)
- {
- Limit = 1,
- ReadConcern = GetReadConcern(),
- SingleBatch = true,
- Projection = new BsonDocument("_id", 1)
- };
- }
- private ListIndexesOperation CreateListIndexesOperation(CollectionNamespace collectionNamespace)
- {
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- return new ListIndexesOperation(collectionNamespace, messageEncoderSettings);
- }
- private BulkMixedWriteOperation CreateRenameOperation(TFileId id, string newFilename)
- {
- var filesCollectionNamespace = this.GetFilesCollectionNamespace();
- var filter = new BsonDocument("_id", _idSerializationInfo.SerializeValue(id));
- var update = new BsonDocument("$set", new BsonDocument("filename", newFilename));
- var requests = new[] { new UpdateRequest(UpdateType.Update, filter, update) };
- var messageEncoderSettings = this.GetMessageEncoderSettings();
- return new BulkMixedWriteOperation(filesCollectionNamespace, requests, messageEncoderSettings);
- }
- private GridFSUploadStream<TFileId> CreateUploadStream(IReadWriteBindingHandle binding, TFileId id, string filename, GridFSUploadOptions options)
- {
- #pragma warning disable 618
- var chunkSizeBytes = options.ChunkSizeBytes ?? _options.ChunkSizeBytes;
- var batchSize = options.BatchSize ?? (16 * 1024 * 1024 / chunkSizeBytes);
- return new GridFSForwardOnlyUploadStream<TFileId>(
- this,
- binding.Fork(),
- id,
- filename,
- options.Metadata,
- options.Aliases,
- options.ContentType,
- chunkSizeBytes,
- batchSize,
- options.DisableMD5);
- #pragma warning restore
- }
- private byte[] DownloadAsBytesHelper(IReadBindingHandle binding, GridFSFileInfo<TFileId> fileInfo, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken))
- {
- if (fileInfo.Length > int.MaxValue)
- {
- throw new NotSupportedException("GridFS stored file is too large to be returned as a byte array.");
- }
- var bytes = new byte[(int)fileInfo.Length];
- using (var destination = new MemoryStream(bytes))
- {
- DownloadToStreamHelper(binding, fileInfo, destination, options, cancellationToken);
- return bytes;
- }
- }
- private async Task<byte[]> DownloadAsBytesHelperAsync(IReadBindingHandle binding, GridFSFileInfo<TFileId> fileInfo, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken))
- {
- if (fileInfo.Length > int.MaxValue)
- {
- throw new NotSupportedException("GridFS stored file is too large to be returned as a byte array.");
- }
- var bytes = new byte[(int)fileInfo.Length];
- using (var destination = new MemoryStream(bytes))
- {
- await DownloadToStreamHelperAsync(binding, fileInfo, destination, options, cancellationToken).ConfigureAwait(false);
- return bytes;
- }
- }
- private void DownloadToStreamHelper(IReadBindingHandle binding, GridFSFileInfo<TFileId> fileInfo, Stream destination, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken))
- {
- var checkMD5 = options.CheckMD5 ?? false;
- using (var source = new GridFSForwardOnlyDownloadStream<TFileId>(this, binding.Fork(), fileInfo, checkMD5))
- {
- var count = source.Length;
- var buffer = new byte[fileInfo.ChunkSizeBytes];
- while (count > 0)
- {
- var partialCount = (int)Math.Min(buffer.Length, count);
- source.ReadBytes(buffer, 0, partialCount, cancellationToken);
- //((Stream)source).ReadBytes(buffer, 0, partialCount, cancellationToken);
- destination.Write(buffer, 0, partialCount);
- count -= partialCount;
- }
- }
- }
- private async Task DownloadToStreamHelperAsync(IReadBindingHandle binding, GridFSFileInfo<TFileId> fileInfo, Stream destination, GridFSDownloadOptions options, CancellationToken cancellationToken = default(CancellationToken))
- {
- var checkMD5 = options.CheckMD5 ?? false;
- using (var source = new GridFSForwardOnlyDownloadStream<TFileId>(this, binding.Fork(), fileInfo, checkMD5))
- {
- var count = source.Length;
- var buffer = new byte[fileInfo.ChunkSizeBytes];
- while (count > 0)
- {
- var partialCount = (int)Math.Min(buffer.Length, count);
- await source.ReadBytesAsync(buffer, 0, partialCount, cancellationToken).ConfigureAwait(false);
- await destination.WriteAsync(buffer, 0, partialCount, cancellationToken).ConfigureAwait(false);
- count -= partialCount;
- }
- await source.CloseAsync(cancellationToken).ConfigureAwait(false);
- }
- }
- private void EnsureIndexes(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- _ensureIndexesSemaphore.Wait(cancellationToken);
- try
- {
- if (!_ensureIndexesDone)
- {
- var isFilesCollectionEmpty = IsFilesCollectionEmpty(binding, cancellationToken);
- if (isFilesCollectionEmpty)
- {
- if (!FilesCollectionIndexesExist(binding, cancellationToken))
- {
- CreateFilesCollectionIndexes(binding, cancellationToken);
- }
- if (!ChunksCollectionIndexesExist(binding, cancellationToken))
- {
- CreateChunksCollectionIndexes(binding, cancellationToken);
- }
- }
- _ensureIndexesDone = true;
- }
- }
- finally
- {
- _ensureIndexesSemaphore.Release();
- }
- }
- private async Task EnsureIndexesAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- await _ensureIndexesSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
- try
- {
- if (!_ensureIndexesDone)
- {
- var isFilesCollectionEmpty = await IsFilesCollectionEmptyAsync(binding, cancellationToken).ConfigureAwait(false);
- if (isFilesCollectionEmpty)
- {
- if (!(await FilesCollectionIndexesExistAsync(binding, cancellationToken).ConfigureAwait(false)))
- {
- await CreateFilesCollectionIndexesAsync(binding, cancellationToken).ConfigureAwait(false);
- }
- if (!(await ChunksCollectionIndexesExistAsync(binding, cancellationToken).ConfigureAwait(false)))
- {
- await CreateChunksCollectionIndexesAsync(binding, cancellationToken).ConfigureAwait(false);
- }
- }
- _ensureIndexesDone = true;
- }
- }
- finally
- {
- _ensureIndexesSemaphore.Release();
- }
- }
- private bool FilesCollectionIndexesExist(List<BsonDocument> indexes)
- {
- var key = new BsonDocument { { "filename", 1 }, { "uploadDate", 1 } };
- return IndexExists(indexes, key);
- }
- private bool FilesCollectionIndexesExist(IReadBindingHandle binding, CancellationToken cancellationToken)
- {
- var indexes = ListIndexes(binding, this.GetFilesCollectionNamespace(), cancellationToken);
- return FilesCollectionIndexesExist(indexes);
- }
- private async Task<bool> FilesCollectionIndexesExistAsync(IReadBindingHandle binding, CancellationToken cancellationToken)
- {
- var indexes = await ListIndexesAsync(binding, this.GetFilesCollectionNamespace(), cancellationToken).ConfigureAwait(false);
- return FilesCollectionIndexesExist(indexes);
- }
- private GridFSFileInfo<TFileId> GetFileInfo(IReadBindingHandle binding, TFileId id, CancellationToken cancellationToken)
- {
- var operation = CreateGetFileInfoOperation(id);
- using (var cursor = operation.Execute(binding, cancellationToken))
- {
- var fileInfo = cursor.FirstOrDefault(cancellationToken);
- if (fileInfo == null)
- {
- throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id));
- }
- return fileInfo;
- }
- }
- private async Task<GridFSFileInfo<TFileId>> GetFileInfoAsync(IReadBindingHandle binding, TFileId id, CancellationToken cancellationToken)
- {
- var operation = CreateGetFileInfoOperation(id);
- using (var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
- if (fileInfo == null)
- {
- throw new GridFSFileNotFoundException(_idSerializationInfo.SerializeValue(id));
- }
- return fileInfo;
- }
- }
- private GridFSFileInfo<TFileId> GetFileInfoByName(IReadBindingHandle binding, string filename, int revision, CancellationToken cancellationToken)
- {
- var operation = CreateGetFileInfoByNameOperation(filename, revision);
- using (var cursor = operation.Execute(binding, cancellationToken))
- {
- var fileInfo = cursor.FirstOrDefault(cancellationToken);
- if (fileInfo == null)
- {
- throw new GridFSFileNotFoundException(filename, revision);
- }
- return fileInfo;
- }
- }
- private async Task<GridFSFileInfo<TFileId>> GetFileInfoByNameAsync(IReadBindingHandle binding, string filename, int revision, CancellationToken cancellationToken)
- {
- var operation = CreateGetFileInfoByNameOperation(filename, revision);
- using (var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false))
- {
- var fileInfo = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
- if (fileInfo == null)
- {
- throw new GridFSFileNotFoundException(filename, revision);
- }
- return fileInfo;
- }
- }
- private ReadConcern GetReadConcern()
- {
- return _options.ReadConcern ?? _database.Settings.ReadConcern;
- }
- private IReadBindingHandle GetSingleServerReadBinding(CancellationToken cancellationToken)
- {
- var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference;
- var selector = new ReadPreferenceServerSelector(readPreference);
- var server = _cluster.SelectServer(selector, cancellationToken);
- var binding = new SingleServerReadBinding(server, readPreference, NoCoreSession.NewHandle());
- return new ReadBindingHandle(binding);
- }
- private async Task<IReadBindingHandle> GetSingleServerReadBindingAsync(CancellationToken cancellationToken)
- {
- var readPreference = _options.ReadPreference ?? _database.Settings.ReadPreference;
- var selector = new ReadPreferenceServerSelector(readPreference);
- var server = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false);
- var binding = new SingleServerReadBinding(server, readPreference, NoCoreSession.NewHandle());
- return new ReadBindingHandle(binding);
- }
- private IReadWriteBindingHandle GetSingleServerReadWriteBinding(CancellationToken cancellationToken)
- {
- var selector = WritableServerSelector.Instance;
- var server = _cluster.SelectServer(selector, cancellationToken);
- var binding = new SingleServerReadWriteBinding(server, NoCoreSession.NewHandle());
- return new ReadWriteBindingHandle(binding);
- }
- private async Task<IReadWriteBindingHandle> GetSingleServerReadWriteBindingAsync(CancellationToken cancellationToken)
- {
- var selector = WritableServerSelector.Instance;
- var server = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false);
- var binding = new SingleServerReadWriteBinding(server, NoCoreSession.NewHandle());
- return new ReadWriteBindingHandle(binding);
- }
- private bool IndexExists(List<BsonDocument> indexes, BsonDocument key)
- {
- foreach (var index in indexes)
- {
- if (index["key"].Equals(key))
- {
- return true;
- }
- }
- return false;
- }
- private bool IsFilesCollectionEmpty(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- var operation = CreateIsFilesCollectionEmptyOperation();
- using (var cursor = operation.Execute(binding, cancellationToken))
- {
- var firstOrDefault = cursor.FirstOrDefault(cancellationToken);
- return firstOrDefault == null;
- }
- }
- private async Task<bool> IsFilesCollectionEmptyAsync(IReadWriteBindingHandle binding, CancellationToken cancellationToken)
- {
- var operation = CreateIsFilesCollectionEmptyOperation();
- using (var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false))
- {
- var firstOrDefault = await cursor.FirstOrDefaultAsync(cancellationToken).ConfigureAwait(false);
- return firstOrDefault == null;
- }
- }
- private List<BsonDocument> ListIndexes(IReadBinding binding, CollectionNamespace collectionNamespace, CancellationToken cancellationToken)
- {
- var operation = CreateListIndexesOperation(collectionNamespace);
- return operation.Execute(binding, cancellationToken).ToList();
- }
- private async Task<List<BsonDocument>> ListIndexesAsync(IReadBinding binding, CollectionNamespace collectionNamespace, CancellationToken cancellationToken)
- {
- var operation = CreateListIndexesOperation(collectionNamespace);
- var cursor = await operation.ExecuteAsync(binding, cancellationToken).ConfigureAwait(false);
- return await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
- }
- }
- }
|