/* Copyright 2010-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; using MongoDB.Bson.IO; using MongoDB.Bson.Serialization; using MongoDB.Bson.Serialization.Serializers; using MongoDB.Driver.Core.Bindings; using MongoDB.Driver.Core.Clusters; using MongoDB.Driver.Core.Clusters.ServerSelectors; using MongoDB.Driver.Core.Misc; using MongoDB.Driver.Core.Operations; using MongoDB.Driver.Core.Servers; using MongoDB.Driver.Core.WireProtocol.Messages.Encoders; namespace MongoDB.Driver { /// public class MongoClient : MongoClientBase { #region static // private static methods private static IEnumerable SelectServersThatDetermineWhetherSessionsAreSupported(ClusterConnectionMode connectionMode, IEnumerable servers) { var connectedServers = servers.Where(s => s.State == ServerState.Connected); if (connectionMode == ClusterConnectionMode.Direct) { return connectedServers; } else { return connectedServers.Where(s => s.IsDataBearing); } } #endregion // private fields private readonly ICluster _cluster; private readonly IOperationExecutor _operationExecutor; private readonly MongoClientSettings _settings; // constructors /// /// Initializes a new instance of the MongoClient class. /// public MongoClient() : this(new MongoClientSettings()) { } /// /// Initializes a new instance of the MongoClient class. /// In .NET Standard, authenticating via SCRAM-SHA-256 may not work with non-ASCII passwords because SaslPrep is /// not fully implemented due to the lack of a string normalization function in .NET Standard 1.5. /// Normalizing the password into Unicode Normalization Form KC beforehand MAY help. /// SCRAM-SHA-1 is the recommended alternative for now. /// /// The settings. public MongoClient(MongoClientSettings settings) { _settings = Ensure.IsNotNull(settings, nameof(settings)).FrozenCopy(); _cluster = ClusterRegistry.Instance.GetOrCreateCluster(_settings.ToClusterKey()); _operationExecutor = new OperationExecutor(this); } /// /// Initializes a new instance of the MongoClient class. /// In .NET Standard, authenticating via SCRAM-SHA-256 may not work with non-ASCII passwords because SaslPrep is /// not fully implemented due to the lack of a string normalization function in .NET Standard 1.5. /// Normalizing the password into Unicode Normalization Form KC beforehand MAY help. /// SCRAM-SHA-1 is the recommended alternative for now. /// /// The URL. public MongoClient(MongoUrl url) : this(MongoClientSettings.FromUrl(url)) { } /// /// Initializes a new instance of the MongoClient class. /// In .NET Standard, authenticating via SCRAM-SHA-256 may not work with non-ASCII passwords because SaslPrep is /// not fully implemented due to the lack of a string normalization function in .NET Standard 1.5. /// Normalizing the password into Unicode Normalization Form KC beforehand MAY help. /// SCRAM-SHA-1 is the recommended alternative for now. /// /// The connection string. public MongoClient(string connectionString) : this(MongoClientSettings.FromConnectionString(connectionString)) { } internal MongoClient(IOperationExecutor operationExecutor, MongoClientSettings settings) : this(settings) { _operationExecutor = operationExecutor; } // public properties /// /// Gets the cluster. /// public override ICluster Cluster { get { return _cluster; } } /// public sealed override MongoClientSettings Settings { get { return _settings; } } // internal properties internal IOperationExecutor OperationExecutor => _operationExecutor; // private static methods // public methods /// public sealed override void DropDatabase(string name, CancellationToken cancellationToken = default(CancellationToken)) { UsingImplicitSession(session => DropDatabase(session, name, cancellationToken), cancellationToken); } /// public sealed override void DropDatabase(IClientSessionHandle session, string name, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(session, nameof(session)); var messageEncoderSettings = GetMessageEncoderSettings(); var operation = new DropDatabaseOperation(new DatabaseNamespace(name), messageEncoderSettings) { WriteConcern = _settings.WriteConcern }; ExecuteWriteOperation(session, operation, cancellationToken); } /// public sealed override Task DropDatabaseAsync(string name, CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSessionAsync(session => DropDatabaseAsync(session, name, cancellationToken), cancellationToken); } /// public sealed override Task DropDatabaseAsync(IClientSessionHandle session, string name, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(session, nameof(session)); var messageEncoderSettings = GetMessageEncoderSettings(); var operation = new DropDatabaseOperation(new DatabaseNamespace(name), messageEncoderSettings) { WriteConcern = _settings.WriteConcern }; return ExecuteWriteOperationAsync(session, operation, cancellationToken); } /// public sealed override IMongoDatabase GetDatabase(string name, MongoDatabaseSettings settings = null) { settings = settings == null ? new MongoDatabaseSettings() : settings.Clone(); settings.ApplyDefaultValues(_settings); return new MongoDatabaseImpl(this, new DatabaseNamespace(name), settings, _cluster, _operationExecutor); } /// public sealed override IAsyncCursor ListDatabaseNames( CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSession(session => ListDatabaseNames(session, cancellationToken), cancellationToken); } /// public sealed override IAsyncCursor ListDatabaseNames( IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken)) { var options = new ListDatabasesOptions { NameOnly = true }; var databases = ListDatabases(session, options, cancellationToken); return CreateDatabaseNamesCursor(databases); } /// public sealed override Task> ListDatabaseNamesAsync( CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSessionAsync(session => ListDatabaseNamesAsync(session, cancellationToken), cancellationToken); } /// public sealed override async Task> ListDatabaseNamesAsync( IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken)) { var options = new ListDatabasesOptions { NameOnly = true }; var databases = await ListDatabasesAsync(session, options, cancellationToken).ConfigureAwait(false); return CreateDatabaseNamesCursor(databases); } /// public sealed override IAsyncCursor ListDatabases( CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSession(session => ListDatabases(session, cancellationToken), cancellationToken); } /// public sealed override IAsyncCursor ListDatabases( ListDatabasesOptions options, CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSession(session => ListDatabases(session, options, cancellationToken), cancellationToken); } /// public sealed override IAsyncCursor ListDatabases( IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken)) { return ListDatabases(session, null, cancellationToken); } /// public sealed override IAsyncCursor ListDatabases( IClientSessionHandle session, ListDatabasesOptions options, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(session, nameof(session)); options = options ?? new ListDatabasesOptions(); var messageEncoderSettings = GetMessageEncoderSettings(); var operation = CreateListDatabaseOperation(options, messageEncoderSettings); return ExecuteReadOperation(session, operation, cancellationToken); } /// public sealed override Task> ListDatabasesAsync( CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSessionAsync(session => ListDatabasesAsync(session, null, cancellationToken), cancellationToken); } /// public sealed override Task> ListDatabasesAsync( ListDatabasesOptions options, CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSessionAsync(session => ListDatabasesAsync(session, options, cancellationToken), cancellationToken); } /// public sealed override Task> ListDatabasesAsync( IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken)) { return ListDatabasesAsync(session, null, cancellationToken); } /// public sealed override Task> ListDatabasesAsync( IClientSessionHandle session, ListDatabasesOptions options, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(session, nameof(session)); options = options ?? new ListDatabasesOptions(); var messageEncoderSettings = GetMessageEncoderSettings(); var operation = CreateListDatabaseOperation(options, messageEncoderSettings); return ExecuteReadOperationAsync(session, operation, cancellationToken); } /// /// Starts an implicit session. /// /// A session. internal IClientSessionHandle StartImplicitSession(CancellationToken cancellationToken) { var areSessionsSupported = AreSessionsSupported(cancellationToken); return StartImplicitSession(areSessionsSupported); } /// /// Starts an implicit session. /// /// A Task whose result is a session. internal async Task StartImplicitSessionAsync(CancellationToken cancellationToken) { var areSessionsSupported = await AreSessionsSupportedAsync(cancellationToken).ConfigureAwait(false); return StartImplicitSession(areSessionsSupported); } /// public sealed override IClientSessionHandle StartSession(ClientSessionOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { var areSessionsSupported = AreSessionsSupported(cancellationToken); return StartSession(options, areSessionsSupported); } /// public sealed override async Task StartSessionAsync(ClientSessionOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { var areSessionsSupported = await AreSessionsSupportedAsync(cancellationToken).ConfigureAwait(false); return StartSession(options, areSessionsSupported); } /// public override IAsyncCursor Watch( PipelineDefinition, TResult> pipeline, ChangeStreamOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSession(session => Watch(session, pipeline, options, cancellationToken), cancellationToken); } /// public override IAsyncCursor Watch( IClientSessionHandle session, PipelineDefinition, TResult> pipeline, ChangeStreamOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(session, nameof(session)); Ensure.IsNotNull(pipeline, nameof(pipeline)); var operation = CreateChangeStreamOperation(pipeline, options); return ExecuteReadOperation(session, operation, cancellationToken); } /// public override Task> WatchAsync( PipelineDefinition, TResult> pipeline, ChangeStreamOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { return UsingImplicitSessionAsync(session => WatchAsync(session, pipeline, options, cancellationToken), cancellationToken); } /// public override Task> WatchAsync( IClientSessionHandle session, PipelineDefinition, TResult> pipeline, ChangeStreamOptions options = null, CancellationToken cancellationToken = default(CancellationToken)) { Ensure.IsNotNull(session, nameof(session)); Ensure.IsNotNull(pipeline, nameof(pipeline)); var operation = CreateChangeStreamOperation(pipeline, options); return ExecuteReadOperationAsync(session, operation, cancellationToken); } /// public override IMongoClient WithReadConcern(ReadConcern readConcern) { Ensure.IsNotNull(readConcern, nameof(readConcern)); var newSettings = Settings.Clone(); newSettings.ReadConcern = readConcern; return new MongoClient(_operationExecutor, newSettings); } /// public override IMongoClient WithReadPreference(ReadPreference readPreference) { Ensure.IsNotNull(readPreference, nameof(readPreference)); var newSettings = Settings.Clone(); newSettings.ReadPreference = readPreference; return new MongoClient(_operationExecutor, newSettings); } /// public override IMongoClient WithWriteConcern(WriteConcern writeConcern) { Ensure.IsNotNull(writeConcern, nameof(writeConcern)); var newSettings = Settings.Clone(); newSettings.WriteConcern = writeConcern; return new MongoClient(_operationExecutor, newSettings); } // private methods private bool AreSessionsSupported(CancellationToken cancellationToken) { return AreSessionsSupported(_cluster.Description) ?? AreSessionsSupportedAfterServerSelection(cancellationToken); } private async Task AreSessionsSupportedAsync(CancellationToken cancellationToken) { return AreSessionsSupported(_cluster.Description) ?? await AreSessionsSupportedAfterSeverSelctionAsync(cancellationToken).ConfigureAwait(false); } private bool? AreSessionsSupported(ClusterDescription clusterDescription) { if (clusterDescription.LogicalSessionTimeout.HasValue) { return true; } else { var selectedServers = SelectServersThatDetermineWhetherSessionsAreSupported(clusterDescription.ConnectionMode, clusterDescription.Servers).ToList(); if (selectedServers.Count == 0) { return null; } else { return false; } } } private bool AreSessionsSupportedAfterServerSelection(CancellationToken cancellationToken) { var selector = new AreSessionsSupportedServerSelector(); var selectedServer = _cluster.SelectServer(selector, cancellationToken); return AreSessionsSupported(selector.ClusterDescription) ?? false; } private async Task AreSessionsSupportedAfterSeverSelctionAsync(CancellationToken cancellationToken) { var selector = new AreSessionsSupportedServerSelector(); var selectedServer = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false); return AreSessionsSupported(selector.ClusterDescription) ?? false; } private IAsyncCursor CreateDatabaseNamesCursor(IAsyncCursor cursor) { return new BatchTransformingAsyncCursor( cursor, databases => databases.Select(database => database["name"].AsString)); } private ListDatabasesOperation CreateListDatabaseOperation( ListDatabasesOptions options, MessageEncoderSettings messageEncoderSettings) { return new ListDatabasesOperation(messageEncoderSettings) { Filter = options.Filter?.Render(BsonDocumentSerializer.Instance, BsonSerializer.SerializerRegistry), NameOnly = options.NameOnly }; } private IReadBindingHandle CreateReadBinding(IClientSessionHandle session) { var readPreference = _settings.ReadPreference; if (session.IsInTransaction && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary) { throw new InvalidOperationException("Read preference in a transaction must be primary."); } var binding = new ReadPreferenceBinding(_cluster, readPreference, session.WrappedCoreSession.Fork()); return new ReadBindingHandle(binding); } private IReadWriteBindingHandle CreateReadWriteBinding(IClientSessionHandle session) { var binding = new WritableServerBinding(_cluster, session.WrappedCoreSession.Fork()); return new ReadWriteBindingHandle(binding); } private ChangeStreamOperation CreateChangeStreamOperation( PipelineDefinition, TResult> pipeline, ChangeStreamOptions options) { return ChangeStreamHelper.CreateChangeStreamOperation(pipeline, options, _settings.ReadConcern, GetMessageEncoderSettings()); } private TResult ExecuteReadOperation(IClientSessionHandle session, IReadOperation operation, CancellationToken cancellationToken = default(CancellationToken)) { using (var binding = CreateReadBinding(session)) { return _operationExecutor.ExecuteReadOperation(binding, operation, cancellationToken); } } private async Task ExecuteReadOperationAsync(IClientSessionHandle session, IReadOperation operation, CancellationToken cancellationToken = default(CancellationToken)) { using (var binding = CreateReadBinding(session)) { return await _operationExecutor.ExecuteReadOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false); } } private TResult ExecuteWriteOperation(IClientSessionHandle session, IWriteOperation operation, CancellationToken cancellationToken = default(CancellationToken)) { using (var binding = CreateReadWriteBinding(session)) { return _operationExecutor.ExecuteWriteOperation(binding, operation, cancellationToken); } } private async Task ExecuteWriteOperationAsync(IClientSessionHandle session, IWriteOperation operation, CancellationToken cancellationToken = default(CancellationToken)) { using (var binding = CreateReadWriteBinding(session)) { return await _operationExecutor.ExecuteWriteOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false); } } private MessageEncoderSettings GetMessageEncoderSettings() { return new MessageEncoderSettings { { MessageEncoderSettingsName.GuidRepresentation, _settings.GuidRepresentation }, { MessageEncoderSettingsName.ReadEncoding, _settings.ReadEncoding ?? Utf8Encodings.Strict }, { MessageEncoderSettingsName.WriteEncoding, _settings.WriteEncoding ?? Utf8Encodings.Strict } }; } private IClientSessionHandle StartImplicitSession(bool areSessionsSupported) { var options = new ClientSessionOptions(); ICoreSessionHandle coreSession; #pragma warning disable 618 var areMultipleUsersAuthenticated = _settings.Credentials.Count() > 1; #pragma warning restore if (areSessionsSupported && !areMultipleUsersAuthenticated) { coreSession = _cluster.StartSession(options.ToCore(isImplicit: true)); } else { coreSession = NoCoreSession.NewHandle(); } return new ClientSessionHandle(this, options, coreSession); } private IClientSessionHandle StartSession(ClientSessionOptions options, bool areSessionsSupported) { if (!areSessionsSupported) { throw new NotSupportedException("Sessions are not supported by this version of the server."); } options = options ?? new ClientSessionOptions(); var coreSession = _cluster.StartSession(options.ToCore()); return new ClientSessionHandle(this, options, coreSession); } private void UsingImplicitSession(Action func, CancellationToken cancellationToken) { using (var session = StartImplicitSession(cancellationToken)) { func(session); } } private TResult UsingImplicitSession(Func func, CancellationToken cancellationToken) { using (var session = StartImplicitSession(cancellationToken)) { return func(session); } } private async Task UsingImplicitSessionAsync(Func funcAsync, CancellationToken cancellationToken) { using (var session = await StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false)) { await funcAsync(session).ConfigureAwait(false); } } private async Task UsingImplicitSessionAsync(Func> funcAsync, CancellationToken cancellationToken) { using (var session = await StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false)) { return await funcAsync(session).ConfigureAwait(false); } } // nested types private class AreSessionsSupportedServerSelector : IServerSelector { public ClusterDescription ClusterDescription; public IEnumerable SelectServers(ClusterDescription cluster, IEnumerable servers) { ClusterDescription = cluster; return SelectServersThatDetermineWhetherSessionsAreSupported(cluster.ConnectionMode, servers); } } } }