MongoClient.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. /* Copyright 2010-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. using MongoDB.Bson;
  21. using MongoDB.Bson.IO;
  22. using MongoDB.Bson.Serialization;
  23. using MongoDB.Bson.Serialization.Serializers;
  24. using MongoDB.Driver.Core.Bindings;
  25. using MongoDB.Driver.Core.Clusters;
  26. using MongoDB.Driver.Core.Clusters.ServerSelectors;
  27. using MongoDB.Driver.Core.Misc;
  28. using MongoDB.Driver.Core.Operations;
  29. using MongoDB.Driver.Core.Servers;
  30. using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
  31. namespace MongoDB.Driver
  32. {
  33. /// <inheritdoc/>
  34. public class MongoClient : MongoClientBase
  35. {
  36. #region static
  37. // private static methods
  38. private static IEnumerable<ServerDescription> SelectServersThatDetermineWhetherSessionsAreSupported(ClusterConnectionMode connectionMode, IEnumerable<ServerDescription> servers)
  39. {
  40. var connectedServers = servers.Where(s => s.State == ServerState.Connected);
  41. if (connectionMode == ClusterConnectionMode.Direct)
  42. {
  43. return connectedServers;
  44. }
  45. else
  46. {
  47. return connectedServers.Where(s => s.IsDataBearing);
  48. }
  49. }
  50. #endregion
  51. // private fields
  52. private readonly ICluster _cluster;
  53. private readonly IOperationExecutor _operationExecutor;
  54. private readonly MongoClientSettings _settings;
  55. // constructors
  56. /// <summary>
  57. /// Initializes a new instance of the MongoClient class.
  58. /// </summary>
  59. public MongoClient()
  60. : this(new MongoClientSettings())
  61. {
  62. }
  63. /// <summary>
  64. /// Initializes a new instance of the MongoClient class.
  65. /// In .NET Standard, authenticating via SCRAM-SHA-256 may not work with non-ASCII passwords because SaslPrep is
  66. /// not fully implemented due to the lack of a string normalization function in .NET Standard 1.5.
  67. /// Normalizing the password into Unicode Normalization Form KC beforehand MAY help.
  68. /// SCRAM-SHA-1 is the recommended alternative for now.
  69. /// </summary>
  70. /// <param name="settings">The settings.</param>
  71. public MongoClient(MongoClientSettings settings)
  72. {
  73. _settings = Ensure.IsNotNull(settings, nameof(settings)).FrozenCopy();
  74. _cluster = ClusterRegistry.Instance.GetOrCreateCluster(_settings.ToClusterKey());
  75. _operationExecutor = new OperationExecutor(this);
  76. }
  77. /// <summary>
  78. /// Initializes a new instance of the MongoClient class.
  79. /// In .NET Standard, authenticating via SCRAM-SHA-256 may not work with non-ASCII passwords because SaslPrep is
  80. /// not fully implemented due to the lack of a string normalization function in .NET Standard 1.5.
  81. /// Normalizing the password into Unicode Normalization Form KC beforehand MAY help.
  82. /// SCRAM-SHA-1 is the recommended alternative for now.
  83. /// </summary>
  84. /// <param name="url">The URL.</param>
  85. public MongoClient(MongoUrl url)
  86. : this(MongoClientSettings.FromUrl(url))
  87. {
  88. }
  89. /// <summary>
  90. /// Initializes a new instance of the MongoClient class.
  91. /// In .NET Standard, authenticating via SCRAM-SHA-256 may not work with non-ASCII passwords because SaslPrep is
  92. /// not fully implemented due to the lack of a string normalization function in .NET Standard 1.5.
  93. /// Normalizing the password into Unicode Normalization Form KC beforehand MAY help.
  94. /// SCRAM-SHA-1 is the recommended alternative for now.
  95. /// </summary>
  96. /// <param name="connectionString">The connection string.</param>
  97. public MongoClient(string connectionString)
  98. : this(MongoClientSettings.FromConnectionString(connectionString))
  99. {
  100. }
  101. internal MongoClient(IOperationExecutor operationExecutor, MongoClientSettings settings)
  102. : this(settings)
  103. {
  104. _operationExecutor = operationExecutor;
  105. }
  106. // public properties
  107. /// <summary>
  108. /// Gets the cluster.
  109. /// </summary>
  110. public override ICluster Cluster
  111. {
  112. get { return _cluster; }
  113. }
  114. /// <inheritdoc/>
  115. public sealed override MongoClientSettings Settings
  116. {
  117. get { return _settings; }
  118. }
  119. // internal properties
  120. internal IOperationExecutor OperationExecutor => _operationExecutor;
  121. // private static methods
  122. // public methods
  123. /// <inheritdoc/>
  124. public sealed override void DropDatabase(string name, CancellationToken cancellationToken = default(CancellationToken))
  125. {
  126. UsingImplicitSession(session => DropDatabase(session, name, cancellationToken), cancellationToken);
  127. }
  128. /// <inheritdoc/>
  129. public sealed override void DropDatabase(IClientSessionHandle session, string name, CancellationToken cancellationToken = default(CancellationToken))
  130. {
  131. Ensure.IsNotNull(session, nameof(session));
  132. var messageEncoderSettings = GetMessageEncoderSettings();
  133. var operation = new DropDatabaseOperation(new DatabaseNamespace(name), messageEncoderSettings)
  134. {
  135. WriteConcern = _settings.WriteConcern
  136. };
  137. ExecuteWriteOperation(session, operation, cancellationToken);
  138. }
  139. /// <inheritdoc/>
  140. public sealed override Task DropDatabaseAsync(string name, CancellationToken cancellationToken = default(CancellationToken))
  141. {
  142. return UsingImplicitSessionAsync(session => DropDatabaseAsync(session, name, cancellationToken), cancellationToken);
  143. }
  144. /// <inheritdoc/>
  145. public sealed override Task DropDatabaseAsync(IClientSessionHandle session, string name, CancellationToken cancellationToken = default(CancellationToken))
  146. {
  147. Ensure.IsNotNull(session, nameof(session));
  148. var messageEncoderSettings = GetMessageEncoderSettings();
  149. var operation = new DropDatabaseOperation(new DatabaseNamespace(name), messageEncoderSettings)
  150. {
  151. WriteConcern = _settings.WriteConcern
  152. };
  153. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  154. }
  155. /// <inheritdoc/>
  156. public sealed override IMongoDatabase GetDatabase(string name, MongoDatabaseSettings settings = null)
  157. {
  158. settings = settings == null ?
  159. new MongoDatabaseSettings() :
  160. settings.Clone();
  161. settings.ApplyDefaultValues(_settings);
  162. return new MongoDatabaseImpl(this, new DatabaseNamespace(name), settings, _cluster, _operationExecutor);
  163. }
  164. /// <inheritdoc />
  165. public sealed override IAsyncCursor<string> ListDatabaseNames(
  166. CancellationToken cancellationToken = default(CancellationToken))
  167. {
  168. return UsingImplicitSession(session => ListDatabaseNames(session, cancellationToken), cancellationToken);
  169. }
  170. /// <inheritdoc />
  171. public sealed override IAsyncCursor<string> ListDatabaseNames(
  172. IClientSessionHandle session,
  173. CancellationToken cancellationToken = default(CancellationToken))
  174. {
  175. var options = new ListDatabasesOptions { NameOnly = true };
  176. var databases = ListDatabases(session, options, cancellationToken);
  177. return CreateDatabaseNamesCursor(databases);
  178. }
  179. /// <inheritdoc />
  180. public sealed override Task<IAsyncCursor<string>> ListDatabaseNamesAsync(
  181. CancellationToken cancellationToken = default(CancellationToken))
  182. {
  183. return UsingImplicitSessionAsync(session => ListDatabaseNamesAsync(session, cancellationToken), cancellationToken);
  184. }
  185. /// <inheritdoc />
  186. public sealed override async Task<IAsyncCursor<string>> ListDatabaseNamesAsync(
  187. IClientSessionHandle session,
  188. CancellationToken cancellationToken = default(CancellationToken))
  189. {
  190. var options = new ListDatabasesOptions { NameOnly = true };
  191. var databases = await ListDatabasesAsync(session, options, cancellationToken).ConfigureAwait(false);
  192. return CreateDatabaseNamesCursor(databases);
  193. }
  194. /// <inheritdoc/>
  195. public sealed override IAsyncCursor<BsonDocument> ListDatabases(
  196. CancellationToken cancellationToken = default(CancellationToken))
  197. {
  198. return UsingImplicitSession(session => ListDatabases(session, cancellationToken), cancellationToken);
  199. }
  200. /// <inheritdoc/>
  201. public sealed override IAsyncCursor<BsonDocument> ListDatabases(
  202. ListDatabasesOptions options,
  203. CancellationToken cancellationToken = default(CancellationToken))
  204. {
  205. return UsingImplicitSession(session => ListDatabases(session, options, cancellationToken), cancellationToken);
  206. }
  207. /// <inheritdoc/>
  208. public sealed override IAsyncCursor<BsonDocument> ListDatabases(
  209. IClientSessionHandle session,
  210. CancellationToken cancellationToken = default(CancellationToken))
  211. {
  212. return ListDatabases(session, null, cancellationToken);
  213. }
  214. /// <inheritdoc/>
  215. public sealed override IAsyncCursor<BsonDocument> ListDatabases(
  216. IClientSessionHandle session,
  217. ListDatabasesOptions options,
  218. CancellationToken cancellationToken = default(CancellationToken))
  219. {
  220. Ensure.IsNotNull(session, nameof(session));
  221. options = options ?? new ListDatabasesOptions();
  222. var messageEncoderSettings = GetMessageEncoderSettings();
  223. var operation = CreateListDatabaseOperation(options, messageEncoderSettings);
  224. return ExecuteReadOperation(session, operation, cancellationToken);
  225. }
  226. /// <inheritdoc/>
  227. public sealed override Task<IAsyncCursor<BsonDocument>> ListDatabasesAsync(
  228. CancellationToken cancellationToken = default(CancellationToken))
  229. {
  230. return UsingImplicitSessionAsync(session => ListDatabasesAsync(session, null, cancellationToken), cancellationToken);
  231. }
  232. /// <inheritdoc/>
  233. public sealed override Task<IAsyncCursor<BsonDocument>> ListDatabasesAsync(
  234. ListDatabasesOptions options,
  235. CancellationToken cancellationToken = default(CancellationToken))
  236. {
  237. return UsingImplicitSessionAsync(session => ListDatabasesAsync(session, options, cancellationToken), cancellationToken);
  238. }
  239. /// <inheritdoc/>
  240. public sealed override Task<IAsyncCursor<BsonDocument>> ListDatabasesAsync(
  241. IClientSessionHandle session,
  242. CancellationToken cancellationToken = default(CancellationToken))
  243. {
  244. return ListDatabasesAsync(session, null, cancellationToken);
  245. }
  246. /// <inheritdoc/>
  247. public sealed override Task<IAsyncCursor<BsonDocument>> ListDatabasesAsync(
  248. IClientSessionHandle session,
  249. ListDatabasesOptions options,
  250. CancellationToken cancellationToken = default(CancellationToken))
  251. {
  252. Ensure.IsNotNull(session, nameof(session));
  253. options = options ?? new ListDatabasesOptions();
  254. var messageEncoderSettings = GetMessageEncoderSettings();
  255. var operation = CreateListDatabaseOperation(options, messageEncoderSettings);
  256. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  257. }
  258. /// <summary>
  259. /// Starts an implicit session.
  260. /// </summary>
  261. /// <returns>A session.</returns>
  262. internal IClientSessionHandle StartImplicitSession(CancellationToken cancellationToken)
  263. {
  264. var areSessionsSupported = AreSessionsSupported(cancellationToken);
  265. return StartImplicitSession(areSessionsSupported);
  266. }
  267. /// <summary>
  268. /// Starts an implicit session.
  269. /// </summary>
  270. /// <returns>A Task whose result is a session.</returns>
  271. internal async Task<IClientSessionHandle> StartImplicitSessionAsync(CancellationToken cancellationToken)
  272. {
  273. var areSessionsSupported = await AreSessionsSupportedAsync(cancellationToken).ConfigureAwait(false);
  274. return StartImplicitSession(areSessionsSupported);
  275. }
  276. /// <inheritdoc/>
  277. public sealed override IClientSessionHandle StartSession(ClientSessionOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  278. {
  279. var areSessionsSupported = AreSessionsSupported(cancellationToken);
  280. return StartSession(options, areSessionsSupported);
  281. }
  282. /// <inheritdoc/>
  283. public sealed override async Task<IClientSessionHandle> StartSessionAsync(ClientSessionOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  284. {
  285. var areSessionsSupported = await AreSessionsSupportedAsync(cancellationToken).ConfigureAwait(false);
  286. return StartSession(options, areSessionsSupported);
  287. }
  288. /// <inheritdoc/>
  289. public override IAsyncCursor<TResult> Watch<TResult>(
  290. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  291. ChangeStreamOptions options = null,
  292. CancellationToken cancellationToken = default(CancellationToken))
  293. {
  294. return UsingImplicitSession(session => Watch(session, pipeline, options, cancellationToken), cancellationToken);
  295. }
  296. /// <inheritdoc/>
  297. public override IAsyncCursor<TResult> Watch<TResult>(
  298. IClientSessionHandle session,
  299. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  300. ChangeStreamOptions options = null,
  301. CancellationToken cancellationToken = default(CancellationToken))
  302. {
  303. Ensure.IsNotNull(session, nameof(session));
  304. Ensure.IsNotNull(pipeline, nameof(pipeline));
  305. var operation = CreateChangeStreamOperation(pipeline, options);
  306. return ExecuteReadOperation(session, operation, cancellationToken);
  307. }
  308. /// <inheritdoc/>
  309. public override Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  310. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  311. ChangeStreamOptions options = null,
  312. CancellationToken cancellationToken = default(CancellationToken))
  313. {
  314. return UsingImplicitSessionAsync(session => WatchAsync(session, pipeline, options, cancellationToken), cancellationToken);
  315. }
  316. /// <inheritdoc/>
  317. public override Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  318. IClientSessionHandle session,
  319. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  320. ChangeStreamOptions options = null,
  321. CancellationToken cancellationToken = default(CancellationToken))
  322. {
  323. Ensure.IsNotNull(session, nameof(session));
  324. Ensure.IsNotNull(pipeline, nameof(pipeline));
  325. var operation = CreateChangeStreamOperation(pipeline, options);
  326. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  327. }
  328. /// <inheritdoc/>
  329. public override IMongoClient WithReadConcern(ReadConcern readConcern)
  330. {
  331. Ensure.IsNotNull(readConcern, nameof(readConcern));
  332. var newSettings = Settings.Clone();
  333. newSettings.ReadConcern = readConcern;
  334. return new MongoClient(_operationExecutor, newSettings);
  335. }
  336. /// <inheritdoc/>
  337. public override IMongoClient WithReadPreference(ReadPreference readPreference)
  338. {
  339. Ensure.IsNotNull(readPreference, nameof(readPreference));
  340. var newSettings = Settings.Clone();
  341. newSettings.ReadPreference = readPreference;
  342. return new MongoClient(_operationExecutor, newSettings);
  343. }
  344. /// <inheritdoc/>
  345. public override IMongoClient WithWriteConcern(WriteConcern writeConcern)
  346. {
  347. Ensure.IsNotNull(writeConcern, nameof(writeConcern));
  348. var newSettings = Settings.Clone();
  349. newSettings.WriteConcern = writeConcern;
  350. return new MongoClient(_operationExecutor, newSettings);
  351. }
  352. // private methods
  353. private bool AreSessionsSupported(CancellationToken cancellationToken)
  354. {
  355. return AreSessionsSupported(_cluster.Description) ?? AreSessionsSupportedAfterServerSelection(cancellationToken);
  356. }
  357. private async Task<bool> AreSessionsSupportedAsync(CancellationToken cancellationToken)
  358. {
  359. return AreSessionsSupported(_cluster.Description) ?? await AreSessionsSupportedAfterSeverSelctionAsync(cancellationToken).ConfigureAwait(false);
  360. }
  361. private bool? AreSessionsSupported(ClusterDescription clusterDescription)
  362. {
  363. if (clusterDescription.LogicalSessionTimeout.HasValue)
  364. {
  365. return true;
  366. }
  367. else
  368. {
  369. var selectedServers = SelectServersThatDetermineWhetherSessionsAreSupported(clusterDescription.ConnectionMode, clusterDescription.Servers).ToList();
  370. if (selectedServers.Count == 0)
  371. {
  372. return null;
  373. }
  374. else
  375. {
  376. return false;
  377. }
  378. }
  379. }
  380. private bool AreSessionsSupportedAfterServerSelection(CancellationToken cancellationToken)
  381. {
  382. var selector = new AreSessionsSupportedServerSelector();
  383. var selectedServer = _cluster.SelectServer(selector, cancellationToken);
  384. return AreSessionsSupported(selector.ClusterDescription) ?? false;
  385. }
  386. private async Task<bool> AreSessionsSupportedAfterSeverSelctionAsync(CancellationToken cancellationToken)
  387. {
  388. var selector = new AreSessionsSupportedServerSelector();
  389. var selectedServer = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false);
  390. return AreSessionsSupported(selector.ClusterDescription) ?? false;
  391. }
  392. private IAsyncCursor<string> CreateDatabaseNamesCursor(IAsyncCursor<BsonDocument> cursor)
  393. {
  394. return new BatchTransformingAsyncCursor<BsonDocument, string>(
  395. cursor,
  396. databases => databases.Select(database => database["name"].AsString));
  397. }
  398. private ListDatabasesOperation CreateListDatabaseOperation(
  399. ListDatabasesOptions options,
  400. MessageEncoderSettings messageEncoderSettings)
  401. {
  402. return new ListDatabasesOperation(messageEncoderSettings)
  403. {
  404. Filter = options.Filter?.Render(BsonDocumentSerializer.Instance, BsonSerializer.SerializerRegistry),
  405. NameOnly = options.NameOnly
  406. };
  407. }
  408. private IReadBindingHandle CreateReadBinding(IClientSessionHandle session)
  409. {
  410. var readPreference = _settings.ReadPreference;
  411. if (session.IsInTransaction && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary)
  412. {
  413. throw new InvalidOperationException("Read preference in a transaction must be primary.");
  414. }
  415. var binding = new ReadPreferenceBinding(_cluster, readPreference, session.WrappedCoreSession.Fork());
  416. return new ReadBindingHandle(binding);
  417. }
  418. private IReadWriteBindingHandle CreateReadWriteBinding(IClientSessionHandle session)
  419. {
  420. var binding = new WritableServerBinding(_cluster, session.WrappedCoreSession.Fork());
  421. return new ReadWriteBindingHandle(binding);
  422. }
  423. private ChangeStreamOperation<TResult> CreateChangeStreamOperation<TResult>(
  424. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  425. ChangeStreamOptions options)
  426. {
  427. return ChangeStreamHelper.CreateChangeStreamOperation(pipeline, options, _settings.ReadConcern, GetMessageEncoderSettings());
  428. }
  429. private TResult ExecuteReadOperation<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  430. {
  431. using (var binding = CreateReadBinding(session))
  432. {
  433. return _operationExecutor.ExecuteReadOperation(binding, operation, cancellationToken);
  434. }
  435. }
  436. private async Task<TResult> ExecuteReadOperationAsync<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  437. {
  438. using (var binding = CreateReadBinding(session))
  439. {
  440. return await _operationExecutor.ExecuteReadOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  441. }
  442. }
  443. private TResult ExecuteWriteOperation<TResult>(IClientSessionHandle session, IWriteOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  444. {
  445. using (var binding = CreateReadWriteBinding(session))
  446. {
  447. return _operationExecutor.ExecuteWriteOperation(binding, operation, cancellationToken);
  448. }
  449. }
  450. private async Task<TResult> ExecuteWriteOperationAsync<TResult>(IClientSessionHandle session, IWriteOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  451. {
  452. using (var binding = CreateReadWriteBinding(session))
  453. {
  454. return await _operationExecutor.ExecuteWriteOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  455. }
  456. }
  457. private MessageEncoderSettings GetMessageEncoderSettings()
  458. {
  459. return new MessageEncoderSettings
  460. {
  461. { MessageEncoderSettingsName.GuidRepresentation, _settings.GuidRepresentation },
  462. { MessageEncoderSettingsName.ReadEncoding, _settings.ReadEncoding ?? Utf8Encodings.Strict },
  463. { MessageEncoderSettingsName.WriteEncoding, _settings.WriteEncoding ?? Utf8Encodings.Strict }
  464. };
  465. }
  466. private IClientSessionHandle StartImplicitSession(bool areSessionsSupported)
  467. {
  468. var options = new ClientSessionOptions();
  469. ICoreSessionHandle coreSession;
  470. #pragma warning disable 618
  471. var areMultipleUsersAuthenticated = _settings.Credentials.Count() > 1;
  472. #pragma warning restore
  473. if (areSessionsSupported && !areMultipleUsersAuthenticated)
  474. {
  475. coreSession = _cluster.StartSession(options.ToCore(isImplicit: true));
  476. }
  477. else
  478. {
  479. coreSession = NoCoreSession.NewHandle();
  480. }
  481. return new ClientSessionHandle(this, options, coreSession);
  482. }
  483. private IClientSessionHandle StartSession(ClientSessionOptions options, bool areSessionsSupported)
  484. {
  485. if (!areSessionsSupported)
  486. {
  487. throw new NotSupportedException("Sessions are not supported by this version of the server.");
  488. }
  489. options = options ?? new ClientSessionOptions();
  490. var coreSession = _cluster.StartSession(options.ToCore());
  491. return new ClientSessionHandle(this, options, coreSession);
  492. }
  493. private void UsingImplicitSession(Action<IClientSessionHandle> func, CancellationToken cancellationToken)
  494. {
  495. using (var session = StartImplicitSession(cancellationToken))
  496. {
  497. func(session);
  498. }
  499. }
  500. private TResult UsingImplicitSession<TResult>(Func<IClientSessionHandle, TResult> func, CancellationToken cancellationToken)
  501. {
  502. using (var session = StartImplicitSession(cancellationToken))
  503. {
  504. return func(session);
  505. }
  506. }
  507. private async Task UsingImplicitSessionAsync(Func<IClientSessionHandle, Task> funcAsync, CancellationToken cancellationToken)
  508. {
  509. using (var session = await StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
  510. {
  511. await funcAsync(session).ConfigureAwait(false);
  512. }
  513. }
  514. private async Task<TResult> UsingImplicitSessionAsync<TResult>(Func<IClientSessionHandle, Task<TResult>> funcAsync, CancellationToken cancellationToken)
  515. {
  516. using (var session = await StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
  517. {
  518. return await funcAsync(session).ConfigureAwait(false);
  519. }
  520. }
  521. // nested types
  522. private class AreSessionsSupportedServerSelector : IServerSelector
  523. {
  524. public ClusterDescription ClusterDescription;
  525. public IEnumerable<ServerDescription> SelectServers(ClusterDescription cluster, IEnumerable<ServerDescription> servers)
  526. {
  527. ClusterDescription = cluster;
  528. return SelectServersThatDetermineWhetherSessionsAreSupported(cluster.ConnectionMode, servers);
  529. }
  530. }
  531. }
  532. }