| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622 |
- /* Copyright 2010-present MongoDB Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reflection;
- using System.Threading;
- using System.Threading.Tasks;
- using MongoDB.Bson;
- using MongoDB.Bson.IO;
- using MongoDB.Bson.Serialization;
- using MongoDB.Bson.Serialization.Serializers;
- using MongoDB.Driver.Core.Bindings;
- using MongoDB.Driver.Core.Clusters;
- using MongoDB.Driver.Core.Misc;
- using MongoDB.Driver.Core.Operations;
- using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
- namespace MongoDB.Driver
- {
- internal sealed class MongoDatabaseImpl : MongoDatabaseBase
- {
- // private fields
- private readonly IMongoClient _client;
- private readonly ICluster _cluster;
- private readonly DatabaseNamespace _databaseNamespace;
- private readonly IOperationExecutor _operationExecutor;
- private readonly MongoDatabaseSettings _settings;
- // constructors
- public MongoDatabaseImpl(IMongoClient client, DatabaseNamespace databaseNamespace, MongoDatabaseSettings settings, ICluster cluster, IOperationExecutor operationExecutor)
- {
- _client = Ensure.IsNotNull(client, nameof(client));
- _databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace));
- _settings = Ensure.IsNotNull(settings, nameof(settings)).Freeze();
- _cluster = Ensure.IsNotNull(cluster, nameof(cluster));
- _operationExecutor = Ensure.IsNotNull(operationExecutor, nameof(operationExecutor));
- }
- // public properties
- public override IMongoClient Client
- {
- get { return _client; }
- }
- public override DatabaseNamespace DatabaseNamespace
- {
- get { return _databaseNamespace; }
- }
- public override MongoDatabaseSettings Settings
- {
- get { return _settings; }
- }
- // public methods
- public override void CreateCollection(string name, CreateCollectionOptions options, CancellationToken cancellationToken)
- {
- UsingImplicitSession(session => CreateCollection(session, name, options, cancellationToken), cancellationToken);
- }
- public override void CreateCollection(IClientSessionHandle session, string name, CreateCollectionOptions options, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNullOrEmpty(name, nameof(name));
- if (options == null)
- {
- CreateCollectionHelper<BsonDocument>(session, name, null, cancellationToken);
- return;
- }
- if (options.GetType() == typeof(CreateCollectionOptions))
- {
- var genericOptions = CreateCollectionOptions<BsonDocument>.CoercedFrom(options);
- CreateCollectionHelper<BsonDocument>(session, name, genericOptions, cancellationToken);
- return;
- }
- var genericMethodDefinition = typeof(MongoDatabaseImpl).GetTypeInfo().GetMethod("CreateCollectionHelper", BindingFlags.NonPublic | BindingFlags.Instance);
- var documentType = options.GetType().GetTypeInfo().GetGenericArguments()[0];
- var methodInfo = genericMethodDefinition.MakeGenericMethod(documentType);
- methodInfo.Invoke(this, new object[] { session, name, options, cancellationToken });
- }
- public override Task CreateCollectionAsync(string name, CreateCollectionOptions options, CancellationToken cancellationToken)
- {
- return UsingImplicitSessionAsync(session => CreateCollectionAsync(session, name, options, cancellationToken), cancellationToken);
- }
- public override Task CreateCollectionAsync(IClientSessionHandle session, string name, CreateCollectionOptions options, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNullOrEmpty(name, nameof(name));
- if (options == null)
- {
- return CreateCollectionHelperAsync<BsonDocument>(session, name, null, cancellationToken);
- }
- if (options.GetType() == typeof(CreateCollectionOptions))
- {
- var genericOptions = CreateCollectionOptions<BsonDocument>.CoercedFrom(options);
- return CreateCollectionHelperAsync<BsonDocument>(session, name, genericOptions, cancellationToken);
- }
- var genericMethodDefinition = typeof(MongoDatabaseImpl).GetTypeInfo().GetMethod("CreateCollectionHelperAsync", BindingFlags.NonPublic | BindingFlags.Instance);
- var documentType = options.GetType().GetTypeInfo().GetGenericArguments()[0];
- var methodInfo = genericMethodDefinition.MakeGenericMethod(documentType);
- return (Task)methodInfo.Invoke(this, new object[] { session, name, options, cancellationToken });
- }
- public override void CreateView<TDocument, TResult>(string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- UsingImplicitSession(session => CreateView(session, viewName, viewOn, pipeline, options, cancellationToken), cancellationToken);
- }
- public override void CreateView<TDocument, TResult>(IClientSessionHandle session, string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNull(viewName, nameof(viewName));
- Ensure.IsNotNull(viewOn, nameof(viewOn));
- Ensure.IsNotNull(pipeline, nameof(pipeline));
- options = options ?? new CreateViewOptions<TDocument>();
- var operation = CreateCreateViewOperation(viewName, viewOn, pipeline, options);
- ExecuteWriteOperation(session, operation, cancellationToken);
- }
- public override Task CreateViewAsync<TDocument, TResult>(string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- return UsingImplicitSessionAsync(session => CreateViewAsync(session, viewName, viewOn, pipeline, options, cancellationToken), cancellationToken);
- }
- public override Task CreateViewAsync<TDocument, TResult>(IClientSessionHandle session, string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNull(viewName, nameof(viewName));
- Ensure.IsNotNull(viewOn, nameof(viewOn));
- Ensure.IsNotNull(pipeline, nameof(pipeline));
- options = options ?? new CreateViewOptions<TDocument>();
- var operation = CreateCreateViewOperation(viewName, viewOn, pipeline, options);
- return ExecuteWriteOperationAsync(session, operation, cancellationToken);
- }
- public override void DropCollection(string name, CancellationToken cancellationToken)
- {
- UsingImplicitSession(session => DropCollection(session, name, cancellationToken), cancellationToken);
- }
- public override void DropCollection(IClientSessionHandle session, string name, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNullOrEmpty(name, nameof(name));
- var operation = CreateDropCollectionOperation(name);
- ExecuteWriteOperation(session, operation, cancellationToken);
- }
- public override Task DropCollectionAsync(string name, CancellationToken cancellationToken)
- {
- return UsingImplicitSessionAsync(session => DropCollectionAsync(session, name, cancellationToken), cancellationToken);
- }
- public override Task DropCollectionAsync(IClientSessionHandle session, string name, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNullOrEmpty(name, nameof(name));
- var operation = CreateDropCollectionOperation(name);
- return ExecuteWriteOperationAsync(session, operation, cancellationToken);
- }
- public override IMongoCollection<TDocument> GetCollection<TDocument>(string name, MongoCollectionSettings settings)
- {
- Ensure.IsNotNullOrEmpty(name, nameof(name));
- settings = settings == null ?
- new MongoCollectionSettings() :
- settings.Clone();
- settings.ApplyDefaultValues(_settings);
- return new MongoCollectionImpl<TDocument>(this, new CollectionNamespace(_databaseNamespace, name), settings, _cluster, _operationExecutor);
- }
- public override IAsyncCursor<string> ListCollectionNames(ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- return UsingImplicitSession(session => ListCollectionNames(session, options, cancellationToken), cancellationToken);
- }
- public override IAsyncCursor<string> ListCollectionNames(IClientSessionHandle session, ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- var operation = CreateListCollectionNamesOperation(options);
- var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
- var cursor = ExecuteReadOperation(session, operation, effectiveReadPreference, cancellationToken);
- return new BatchTransformingAsyncCursor<BsonDocument, string>(cursor, ExtractCollectionNames);
- }
- public override Task<IAsyncCursor<string>> ListCollectionNamesAsync(ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- return UsingImplicitSessionAsync(session => ListCollectionNamesAsync(session, options, cancellationToken), cancellationToken);
- }
- public override async Task<IAsyncCursor<string>> ListCollectionNamesAsync(IClientSessionHandle session, ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- var operation = CreateListCollectionNamesOperation(options);
- var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
- var cursor = await ExecuteReadOperationAsync(session, operation, effectiveReadPreference, cancellationToken).ConfigureAwait(false);
- return new BatchTransformingAsyncCursor<BsonDocument, string>(cursor, ExtractCollectionNames);
- }
- public override IAsyncCursor<BsonDocument> ListCollections(ListCollectionsOptions options, CancellationToken cancellationToken)
- {
- return UsingImplicitSession(session => ListCollections(session, options, cancellationToken), cancellationToken);
- }
- public override IAsyncCursor<BsonDocument> ListCollections(IClientSessionHandle session, ListCollectionsOptions options, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- var operation = CreateListCollectionsOperation(options);
- var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
- return ExecuteReadOperation(session, operation, effectiveReadPreference, cancellationToken);
- }
- public override Task<IAsyncCursor<BsonDocument>> ListCollectionsAsync(ListCollectionsOptions options, CancellationToken cancellationToken)
- {
- return UsingImplicitSessionAsync(session => ListCollectionsAsync(session, options, cancellationToken), cancellationToken);
- }
- public override Task<IAsyncCursor<BsonDocument>> ListCollectionsAsync(IClientSessionHandle session, ListCollectionsOptions options, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- var operation = CreateListCollectionsOperation(options);
- var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
- return ExecuteReadOperationAsync(session, operation, effectiveReadPreference, cancellationToken);
- }
- public override void RenameCollection(string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
- {
- UsingImplicitSession(session => RenameCollection(session, oldName, newName, options, cancellationToken), cancellationToken);
- }
- public override void RenameCollection(IClientSessionHandle session, string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNullOrEmpty(oldName, nameof(oldName));
- Ensure.IsNotNullOrEmpty(newName, nameof(newName));
- options = options ?? new RenameCollectionOptions();
- var operation = CreateRenameCollectionOperation(oldName, newName, options);
- ExecuteWriteOperation(session, operation, cancellationToken);
- }
- public override Task RenameCollectionAsync(string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
- {
- return UsingImplicitSessionAsync(session => RenameCollectionAsync(session, oldName, newName, options, cancellationToken), cancellationToken);
- }
- public override Task RenameCollectionAsync(IClientSessionHandle session, string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNullOrEmpty(oldName, nameof(oldName));
- Ensure.IsNotNullOrEmpty(newName, nameof(newName));
- options = options ?? new RenameCollectionOptions();
- var operation = CreateRenameCollectionOperation(oldName, newName, options);
- return ExecuteWriteOperationAsync(session, operation, cancellationToken);
- }
- public override TResult RunCommand<TResult>(Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- return UsingImplicitSession(session => RunCommand(session, command, readPreference, cancellationToken), cancellationToken);
- }
- public override TResult RunCommand<TResult>(IClientSessionHandle session, Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNull(command, nameof(command));
- var operation = CreateRunCommandOperation(command);
- var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, readPreference, ReadPreference.Primary);
- return ExecuteReadOperation(session, operation, effectiveReadPreference, cancellationToken);
- }
- public override Task<TResult> RunCommandAsync<TResult>(Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- return UsingImplicitSessionAsync(session => RunCommandAsync(session, command, readPreference, cancellationToken), cancellationToken);
- }
- public override Task<TResult> RunCommandAsync<TResult>(IClientSessionHandle session, Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNull(command, nameof(command));
- var operation = CreateRunCommandOperation(command);
- var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, readPreference, ReadPreference.Primary);
- return ExecuteReadOperationAsync(session, operation, effectiveReadPreference, cancellationToken);
- }
- public override IAsyncCursor<TResult> Watch<TResult>(
- PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
- ChangeStreamOptions options = null,
- CancellationToken cancellationToken = default(CancellationToken))
- {
- return UsingImplicitSession(session => Watch(session, pipeline, options, cancellationToken), cancellationToken);
- }
- public override IAsyncCursor<TResult> Watch<TResult>(
- IClientSessionHandle session,
- PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
- ChangeStreamOptions options = null,
- CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNull(pipeline, nameof(pipeline));
- var operation = CreateChangeStreamOperation(pipeline, options);
- return ExecuteReadOperation(session, operation, cancellationToken);
- }
- public override Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
- PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
- ChangeStreamOptions options = null,
- CancellationToken cancellationToken = default(CancellationToken))
- {
- return UsingImplicitSessionAsync(session => WatchAsync(session, pipeline, options, cancellationToken), cancellationToken);
- }
- public override Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
- IClientSessionHandle session,
- PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
- ChangeStreamOptions options = null,
- CancellationToken cancellationToken = default(CancellationToken))
- {
- Ensure.IsNotNull(session, nameof(session));
- Ensure.IsNotNull(pipeline, nameof(pipeline));
- var operation = CreateChangeStreamOperation(pipeline, options);
- return ExecuteReadOperationAsync(session, operation, cancellationToken);
- }
- public override IMongoDatabase WithReadConcern(ReadConcern readConcern)
- {
- Ensure.IsNotNull(readConcern, nameof(readConcern));
- var newSettings = _settings.Clone();
- newSettings.ReadConcern = readConcern;
- return new MongoDatabaseImpl(_client, _databaseNamespace, newSettings, _cluster, _operationExecutor);
- }
- public override IMongoDatabase WithReadPreference(ReadPreference readPreference)
- {
- Ensure.IsNotNull(readPreference, nameof(readPreference));
- var newSettings = _settings.Clone();
- newSettings.ReadPreference = readPreference;
- return new MongoDatabaseImpl(_client, _databaseNamespace, newSettings, _cluster, _operationExecutor);
- }
- public override IMongoDatabase WithWriteConcern(WriteConcern writeConcern)
- {
- Ensure.IsNotNull(writeConcern, nameof(writeConcern));
- var newSettings = _settings.Clone();
- newSettings.WriteConcern = writeConcern;
- return new MongoDatabaseImpl(_client, _databaseNamespace, newSettings, _cluster, _operationExecutor);
- }
- // private methods
- private void CreateCollectionHelper<TDocument>(IClientSessionHandle session, string name, CreateCollectionOptions<TDocument> options, CancellationToken cancellationToken)
- {
- options = options ?? new CreateCollectionOptions<TDocument>();
- var operation = CreateCreateCollectionOperation(name, options);
- ExecuteWriteOperation(session, operation, cancellationToken);
- }
- private Task CreateCollectionHelperAsync<TDocument>(IClientSessionHandle session, string name, CreateCollectionOptions<TDocument> options, CancellationToken cancellationToken)
- {
- options = options ?? new CreateCollectionOptions<TDocument>();
- var operation = CreateCreateCollectionOperation(name, options);
- return ExecuteWriteOperationAsync(session, operation, cancellationToken);
- }
- private CreateCollectionOperation CreateCreateCollectionOperation(string name, CreateCollectionOptions options)
- {
- options = options ?? new CreateCollectionOptions();
- var messageEncoderSettings = GetMessageEncoderSettings();
- #pragma warning disable 618
- return new CreateCollectionOperation(new CollectionNamespace(_databaseNamespace, name), messageEncoderSettings)
- {
- AutoIndexId = options.AutoIndexId,
- Collation = options.Collation,
- Capped = options.Capped,
- MaxDocuments = options.MaxDocuments,
- MaxSize = options.MaxSize,
- NoPadding = options.NoPadding,
- StorageEngine = options.StorageEngine,
- UsePowerOf2Sizes = options.UsePowerOf2Sizes,
- WriteConcern = _settings.WriteConcern
- };
- #pragma warning restore
- }
- private CreateCollectionOperation CreateCreateCollectionOperation<TDocument>(string name, CreateCollectionOptions<TDocument> options)
- {
- var messageEncoderSettings = GetMessageEncoderSettings();
- BsonDocument validator = null;
- if (options.Validator != null)
- {
- var serializerRegistry = options.SerializerRegistry ?? BsonSerializer.SerializerRegistry;
- var documentSerializer = options.DocumentSerializer ?? serializerRegistry.GetSerializer<TDocument>();
- validator = options.Validator.Render(documentSerializer, serializerRegistry);
- }
- #pragma warning disable 618
- return new CreateCollectionOperation(new CollectionNamespace(_databaseNamespace, name), messageEncoderSettings)
- {
- AutoIndexId = options.AutoIndexId,
- Capped = options.Capped,
- Collation = options.Collation,
- IndexOptionDefaults = options.IndexOptionDefaults?.ToBsonDocument(),
- MaxDocuments = options.MaxDocuments,
- MaxSize = options.MaxSize,
- NoPadding = options.NoPadding,
- StorageEngine = options.StorageEngine,
- UsePowerOf2Sizes = options.UsePowerOf2Sizes,
- ValidationAction = options.ValidationAction,
- ValidationLevel = options.ValidationLevel,
- Validator = validator,
- WriteConcern = _settings.WriteConcern
- };
- #pragma warning restore
- }
- private CreateViewOperation CreateCreateViewOperation<TDocument, TResult>(string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options)
- {
- var serializerRegistry = options.SerializerRegistry ?? BsonSerializer.SerializerRegistry;
- var documentSerializer = options.DocumentSerializer ?? serializerRegistry.GetSerializer<TDocument>();
- var pipelineDocuments = pipeline.Render(documentSerializer, serializerRegistry).Documents;
- return new CreateViewOperation(_databaseNamespace, viewName, viewOn, pipelineDocuments, GetMessageEncoderSettings())
- {
- Collation = options.Collation,
- WriteConcern = _settings.WriteConcern
- };
- }
- private DropCollectionOperation CreateDropCollectionOperation(string name)
- {
- var collectionNamespace = new CollectionNamespace(_databaseNamespace, name);
- var messageEncoderSettings = GetMessageEncoderSettings();
- return new DropCollectionOperation(collectionNamespace, messageEncoderSettings)
- {
- WriteConcern = _settings.WriteConcern
- };
- }
- private ListCollectionsOperation CreateListCollectionNamesOperation(ListCollectionNamesOptions options)
- {
- var messageEncoderSettings = GetMessageEncoderSettings();
- return new ListCollectionsOperation(_databaseNamespace, messageEncoderSettings)
- {
- Filter = options?.Filter?.Render(_settings.SerializerRegistry.GetSerializer<BsonDocument>(), _settings.SerializerRegistry),
- NameOnly = true
- };
- }
- private ListCollectionsOperation CreateListCollectionsOperation(ListCollectionsOptions options)
- {
- var messageEncoderSettings = GetMessageEncoderSettings();
- return new ListCollectionsOperation(_databaseNamespace, messageEncoderSettings)
- {
- Filter = options?.Filter?.Render(_settings.SerializerRegistry.GetSerializer<BsonDocument>(), _settings.SerializerRegistry)
- };
- }
- private IReadBinding CreateReadBinding(IClientSessionHandle session, ReadPreference readPreference)
- {
- if (session.IsInTransaction && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary)
- {
- throw new InvalidOperationException("Read preference in a transaction must be primary.");
- }
- var binding = new ReadPreferenceBinding(_cluster, readPreference, session.WrappedCoreSession.Fork());
- return new ReadBindingHandle(binding);
- }
- private IWriteBindingHandle CreateReadWriteBinding(IClientSessionHandle session)
- {
- var binding = new WritableServerBinding(_cluster, session.WrappedCoreSession.Fork());
- return new ReadWriteBindingHandle(binding);
- }
- private RenameCollectionOperation CreateRenameCollectionOperation(string oldName, string newName, RenameCollectionOptions options)
- {
- var messageEncoderSettings = GetMessageEncoderSettings();
- return new RenameCollectionOperation(
- new CollectionNamespace(_databaseNamespace, oldName),
- new CollectionNamespace(_databaseNamespace, newName),
- messageEncoderSettings)
- {
- DropTarget = options.DropTarget,
- WriteConcern = _settings.WriteConcern
- };
- }
- private ReadCommandOperation<TResult> CreateRunCommandOperation<TResult>(Command<TResult> command)
- {
- var renderedCommand = command.Render(_settings.SerializerRegistry);
- var messageEncoderSettings = GetMessageEncoderSettings();
- return new ReadCommandOperation<TResult>(_databaseNamespace, renderedCommand.Document, renderedCommand.ResultSerializer, messageEncoderSettings);
- }
- private ChangeStreamOperation<TResult> CreateChangeStreamOperation<TResult>(
- PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
- ChangeStreamOptions options)
- {
- return ChangeStreamHelper.CreateChangeStreamOperation(this, pipeline, options, _settings.ReadConcern, GetMessageEncoderSettings());
- }
- private IEnumerable<string> ExtractCollectionNames(IEnumerable<BsonDocument> collections)
- {
- return collections.Select(collection => collection["name"].AsString);
- }
- private T ExecuteReadOperation<T>(IClientSessionHandle session, IReadOperation<T> operation, CancellationToken cancellationToken)
- {
- var readPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, _settings.ReadPreference);
- return ExecuteReadOperation(session, operation, readPreference, cancellationToken);
- }
- private T ExecuteReadOperation<T>(IClientSessionHandle session, IReadOperation<T> operation, ReadPreference readPreference, CancellationToken cancellationToken)
- {
- using (var binding = CreateReadBinding(session, readPreference))
- {
- return _operationExecutor.ExecuteReadOperation(binding, operation, cancellationToken);
- }
- }
- private Task<T> ExecuteReadOperationAsync<T>(IClientSessionHandle session, IReadOperation<T> operation, CancellationToken cancellationToken)
- {
- var readPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, _settings.ReadPreference);
- return ExecuteReadOperationAsync(session, operation, readPreference, cancellationToken);
- }
- private async Task<T> ExecuteReadOperationAsync<T>(IClientSessionHandle session, IReadOperation<T> operation, ReadPreference readPreference, CancellationToken cancellationToken)
- {
- using (var binding = CreateReadBinding(session, readPreference))
- {
- return await _operationExecutor.ExecuteReadOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
- }
- }
- private T ExecuteWriteOperation<T>(IClientSessionHandle session, IWriteOperation<T> operation, CancellationToken cancellationToken)
- {
- using (var binding = CreateReadWriteBinding(session))
- {
- return _operationExecutor.ExecuteWriteOperation(binding, operation, cancellationToken);
- }
- }
- private async Task<T> ExecuteWriteOperationAsync<T>(IClientSessionHandle session, IWriteOperation<T> operation, CancellationToken cancellationToken)
- {
- using (var binding = CreateReadWriteBinding(session))
- {
- return await _operationExecutor.ExecuteWriteOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
- }
- }
- private MessageEncoderSettings GetMessageEncoderSettings()
- {
- return new MessageEncoderSettings
- {
- { MessageEncoderSettingsName.GuidRepresentation, _settings.GuidRepresentation },
- { MessageEncoderSettingsName.ReadEncoding, _settings.ReadEncoding ?? Utf8Encodings.Strict },
- { MessageEncoderSettingsName.WriteEncoding, _settings.WriteEncoding ?? Utf8Encodings.Strict }
- };
- }
- private void UsingImplicitSession(Action<IClientSessionHandle> func, CancellationToken cancellationToken)
- {
- using (var session = _operationExecutor.StartImplicitSession(cancellationToken))
- {
- func(session);
- }
- }
- private TResult UsingImplicitSession<TResult>(Func<IClientSessionHandle, TResult> func, CancellationToken cancellationToken)
- {
- using (var session = _operationExecutor.StartImplicitSession(cancellationToken))
- {
- return func(session);
- }
- }
- private async Task UsingImplicitSessionAsync(Func<IClientSessionHandle, Task> funcAsync, CancellationToken cancellationToken)
- {
- using (var session = await _operationExecutor.StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
- {
- await funcAsync(session).ConfigureAwait(false);
- }
- }
- private async Task<TResult> UsingImplicitSessionAsync<TResult>(Func<IClientSessionHandle, Task<TResult>> funcAsync, CancellationToken cancellationToken)
- {
- using (var session = await _operationExecutor.StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
- {
- return await funcAsync(session).ConfigureAwait(false);
- }
- }
- }
- }
|