MongoDatabaseImpl.cs 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. /* Copyright 2010-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using System.Reflection;
  19. using System.Threading;
  20. using System.Threading.Tasks;
  21. using MongoDB.Bson;
  22. using MongoDB.Bson.IO;
  23. using MongoDB.Bson.Serialization;
  24. using MongoDB.Bson.Serialization.Serializers;
  25. using MongoDB.Driver.Core.Bindings;
  26. using MongoDB.Driver.Core.Clusters;
  27. using MongoDB.Driver.Core.Misc;
  28. using MongoDB.Driver.Core.Operations;
  29. using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
  30. namespace MongoDB.Driver
  31. {
  32. internal sealed class MongoDatabaseImpl : MongoDatabaseBase
  33. {
  34. // private fields
  35. private readonly IMongoClient _client;
  36. private readonly ICluster _cluster;
  37. private readonly DatabaseNamespace _databaseNamespace;
  38. private readonly IOperationExecutor _operationExecutor;
  39. private readonly MongoDatabaseSettings _settings;
  40. // constructors
  41. public MongoDatabaseImpl(IMongoClient client, DatabaseNamespace databaseNamespace, MongoDatabaseSettings settings, ICluster cluster, IOperationExecutor operationExecutor)
  42. {
  43. _client = Ensure.IsNotNull(client, nameof(client));
  44. _databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace));
  45. _settings = Ensure.IsNotNull(settings, nameof(settings)).Freeze();
  46. _cluster = Ensure.IsNotNull(cluster, nameof(cluster));
  47. _operationExecutor = Ensure.IsNotNull(operationExecutor, nameof(operationExecutor));
  48. }
  49. // public properties
  50. public override IMongoClient Client
  51. {
  52. get { return _client; }
  53. }
  54. public override DatabaseNamespace DatabaseNamespace
  55. {
  56. get { return _databaseNamespace; }
  57. }
  58. public override MongoDatabaseSettings Settings
  59. {
  60. get { return _settings; }
  61. }
  62. // public methods
  63. public override void CreateCollection(string name, CreateCollectionOptions options, CancellationToken cancellationToken)
  64. {
  65. UsingImplicitSession(session => CreateCollection(session, name, options, cancellationToken), cancellationToken);
  66. }
  67. public override void CreateCollection(IClientSessionHandle session, string name, CreateCollectionOptions options, CancellationToken cancellationToken)
  68. {
  69. Ensure.IsNotNull(session, nameof(session));
  70. Ensure.IsNotNullOrEmpty(name, nameof(name));
  71. if (options == null)
  72. {
  73. CreateCollectionHelper<BsonDocument>(session, name, null, cancellationToken);
  74. return;
  75. }
  76. if (options.GetType() == typeof(CreateCollectionOptions))
  77. {
  78. var genericOptions = CreateCollectionOptions<BsonDocument>.CoercedFrom(options);
  79. CreateCollectionHelper<BsonDocument>(session, name, genericOptions, cancellationToken);
  80. return;
  81. }
  82. var genericMethodDefinition = typeof(MongoDatabaseImpl).GetTypeInfo().GetMethod("CreateCollectionHelper", BindingFlags.NonPublic | BindingFlags.Instance);
  83. var documentType = options.GetType().GetTypeInfo().GetGenericArguments()[0];
  84. var methodInfo = genericMethodDefinition.MakeGenericMethod(documentType);
  85. methodInfo.Invoke(this, new object[] { session, name, options, cancellationToken });
  86. }
  87. public override Task CreateCollectionAsync(string name, CreateCollectionOptions options, CancellationToken cancellationToken)
  88. {
  89. return UsingImplicitSessionAsync(session => CreateCollectionAsync(session, name, options, cancellationToken), cancellationToken);
  90. }
  91. public override Task CreateCollectionAsync(IClientSessionHandle session, string name, CreateCollectionOptions options, CancellationToken cancellationToken)
  92. {
  93. Ensure.IsNotNull(session, nameof(session));
  94. Ensure.IsNotNullOrEmpty(name, nameof(name));
  95. if (options == null)
  96. {
  97. return CreateCollectionHelperAsync<BsonDocument>(session, name, null, cancellationToken);
  98. }
  99. if (options.GetType() == typeof(CreateCollectionOptions))
  100. {
  101. var genericOptions = CreateCollectionOptions<BsonDocument>.CoercedFrom(options);
  102. return CreateCollectionHelperAsync<BsonDocument>(session, name, genericOptions, cancellationToken);
  103. }
  104. var genericMethodDefinition = typeof(MongoDatabaseImpl).GetTypeInfo().GetMethod("CreateCollectionHelperAsync", BindingFlags.NonPublic | BindingFlags.Instance);
  105. var documentType = options.GetType().GetTypeInfo().GetGenericArguments()[0];
  106. var methodInfo = genericMethodDefinition.MakeGenericMethod(documentType);
  107. return (Task)methodInfo.Invoke(this, new object[] { session, name, options, cancellationToken });
  108. }
  109. public override void CreateView<TDocument, TResult>(string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
  110. {
  111. UsingImplicitSession(session => CreateView(session, viewName, viewOn, pipeline, options, cancellationToken), cancellationToken);
  112. }
  113. public override void CreateView<TDocument, TResult>(IClientSessionHandle session, string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
  114. {
  115. Ensure.IsNotNull(session, nameof(session));
  116. Ensure.IsNotNull(viewName, nameof(viewName));
  117. Ensure.IsNotNull(viewOn, nameof(viewOn));
  118. Ensure.IsNotNull(pipeline, nameof(pipeline));
  119. options = options ?? new CreateViewOptions<TDocument>();
  120. var operation = CreateCreateViewOperation(viewName, viewOn, pipeline, options);
  121. ExecuteWriteOperation(session, operation, cancellationToken);
  122. }
  123. public override Task CreateViewAsync<TDocument, TResult>(string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
  124. {
  125. return UsingImplicitSessionAsync(session => CreateViewAsync(session, viewName, viewOn, pipeline, options, cancellationToken), cancellationToken);
  126. }
  127. public override Task CreateViewAsync<TDocument, TResult>(IClientSessionHandle session, string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options = null, CancellationToken cancellationToken = default(CancellationToken))
  128. {
  129. Ensure.IsNotNull(session, nameof(session));
  130. Ensure.IsNotNull(viewName, nameof(viewName));
  131. Ensure.IsNotNull(viewOn, nameof(viewOn));
  132. Ensure.IsNotNull(pipeline, nameof(pipeline));
  133. options = options ?? new CreateViewOptions<TDocument>();
  134. var operation = CreateCreateViewOperation(viewName, viewOn, pipeline, options);
  135. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  136. }
  137. public override void DropCollection(string name, CancellationToken cancellationToken)
  138. {
  139. UsingImplicitSession(session => DropCollection(session, name, cancellationToken), cancellationToken);
  140. }
  141. public override void DropCollection(IClientSessionHandle session, string name, CancellationToken cancellationToken)
  142. {
  143. Ensure.IsNotNull(session, nameof(session));
  144. Ensure.IsNotNullOrEmpty(name, nameof(name));
  145. var operation = CreateDropCollectionOperation(name);
  146. ExecuteWriteOperation(session, operation, cancellationToken);
  147. }
  148. public override Task DropCollectionAsync(string name, CancellationToken cancellationToken)
  149. {
  150. return UsingImplicitSessionAsync(session => DropCollectionAsync(session, name, cancellationToken), cancellationToken);
  151. }
  152. public override Task DropCollectionAsync(IClientSessionHandle session, string name, CancellationToken cancellationToken)
  153. {
  154. Ensure.IsNotNull(session, nameof(session));
  155. Ensure.IsNotNullOrEmpty(name, nameof(name));
  156. var operation = CreateDropCollectionOperation(name);
  157. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  158. }
  159. public override IMongoCollection<TDocument> GetCollection<TDocument>(string name, MongoCollectionSettings settings)
  160. {
  161. Ensure.IsNotNullOrEmpty(name, nameof(name));
  162. settings = settings == null ?
  163. new MongoCollectionSettings() :
  164. settings.Clone();
  165. settings.ApplyDefaultValues(_settings);
  166. return new MongoCollectionImpl<TDocument>(this, new CollectionNamespace(_databaseNamespace, name), settings, _cluster, _operationExecutor);
  167. }
  168. public override IAsyncCursor<string> ListCollectionNames(ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  169. {
  170. return UsingImplicitSession(session => ListCollectionNames(session, options, cancellationToken), cancellationToken);
  171. }
  172. public override IAsyncCursor<string> ListCollectionNames(IClientSessionHandle session, ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  173. {
  174. Ensure.IsNotNull(session, nameof(session));
  175. var operation = CreateListCollectionNamesOperation(options);
  176. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
  177. var cursor = ExecuteReadOperation(session, operation, effectiveReadPreference, cancellationToken);
  178. return new BatchTransformingAsyncCursor<BsonDocument, string>(cursor, ExtractCollectionNames);
  179. }
  180. public override Task<IAsyncCursor<string>> ListCollectionNamesAsync(ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  181. {
  182. return UsingImplicitSessionAsync(session => ListCollectionNamesAsync(session, options, cancellationToken), cancellationToken);
  183. }
  184. public override async Task<IAsyncCursor<string>> ListCollectionNamesAsync(IClientSessionHandle session, ListCollectionNamesOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  185. {
  186. Ensure.IsNotNull(session, nameof(session));
  187. var operation = CreateListCollectionNamesOperation(options);
  188. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
  189. var cursor = await ExecuteReadOperationAsync(session, operation, effectiveReadPreference, cancellationToken).ConfigureAwait(false);
  190. return new BatchTransformingAsyncCursor<BsonDocument, string>(cursor, ExtractCollectionNames);
  191. }
  192. public override IAsyncCursor<BsonDocument> ListCollections(ListCollectionsOptions options, CancellationToken cancellationToken)
  193. {
  194. return UsingImplicitSession(session => ListCollections(session, options, cancellationToken), cancellationToken);
  195. }
  196. public override IAsyncCursor<BsonDocument> ListCollections(IClientSessionHandle session, ListCollectionsOptions options, CancellationToken cancellationToken)
  197. {
  198. Ensure.IsNotNull(session, nameof(session));
  199. var operation = CreateListCollectionsOperation(options);
  200. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
  201. return ExecuteReadOperation(session, operation, effectiveReadPreference, cancellationToken);
  202. }
  203. public override Task<IAsyncCursor<BsonDocument>> ListCollectionsAsync(ListCollectionsOptions options, CancellationToken cancellationToken)
  204. {
  205. return UsingImplicitSessionAsync(session => ListCollectionsAsync(session, options, cancellationToken), cancellationToken);
  206. }
  207. public override Task<IAsyncCursor<BsonDocument>> ListCollectionsAsync(IClientSessionHandle session, ListCollectionsOptions options, CancellationToken cancellationToken)
  208. {
  209. Ensure.IsNotNull(session, nameof(session));
  210. var operation = CreateListCollectionsOperation(options);
  211. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, ReadPreference.Primary);
  212. return ExecuteReadOperationAsync(session, operation, effectiveReadPreference, cancellationToken);
  213. }
  214. public override void RenameCollection(string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
  215. {
  216. UsingImplicitSession(session => RenameCollection(session, oldName, newName, options, cancellationToken), cancellationToken);
  217. }
  218. public override void RenameCollection(IClientSessionHandle session, string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
  219. {
  220. Ensure.IsNotNull(session, nameof(session));
  221. Ensure.IsNotNullOrEmpty(oldName, nameof(oldName));
  222. Ensure.IsNotNullOrEmpty(newName, nameof(newName));
  223. options = options ?? new RenameCollectionOptions();
  224. var operation = CreateRenameCollectionOperation(oldName, newName, options);
  225. ExecuteWriteOperation(session, operation, cancellationToken);
  226. }
  227. public override Task RenameCollectionAsync(string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
  228. {
  229. return UsingImplicitSessionAsync(session => RenameCollectionAsync(session, oldName, newName, options, cancellationToken), cancellationToken);
  230. }
  231. public override Task RenameCollectionAsync(IClientSessionHandle session, string oldName, string newName, RenameCollectionOptions options, CancellationToken cancellationToken)
  232. {
  233. Ensure.IsNotNull(session, nameof(session));
  234. Ensure.IsNotNullOrEmpty(oldName, nameof(oldName));
  235. Ensure.IsNotNullOrEmpty(newName, nameof(newName));
  236. options = options ?? new RenameCollectionOptions();
  237. var operation = CreateRenameCollectionOperation(oldName, newName, options);
  238. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  239. }
  240. public override TResult RunCommand<TResult>(Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
  241. {
  242. return UsingImplicitSession(session => RunCommand(session, command, readPreference, cancellationToken), cancellationToken);
  243. }
  244. public override TResult RunCommand<TResult>(IClientSessionHandle session, Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
  245. {
  246. Ensure.IsNotNull(session, nameof(session));
  247. Ensure.IsNotNull(command, nameof(command));
  248. var operation = CreateRunCommandOperation(command);
  249. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, readPreference, ReadPreference.Primary);
  250. return ExecuteReadOperation(session, operation, effectiveReadPreference, cancellationToken);
  251. }
  252. public override Task<TResult> RunCommandAsync<TResult>(Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
  253. {
  254. return UsingImplicitSessionAsync(session => RunCommandAsync(session, command, readPreference, cancellationToken), cancellationToken);
  255. }
  256. public override Task<TResult> RunCommandAsync<TResult>(IClientSessionHandle session, Command<TResult> command, ReadPreference readPreference = null, CancellationToken cancellationToken = default(CancellationToken))
  257. {
  258. Ensure.IsNotNull(session, nameof(session));
  259. Ensure.IsNotNull(command, nameof(command));
  260. var operation = CreateRunCommandOperation(command);
  261. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, readPreference, ReadPreference.Primary);
  262. return ExecuteReadOperationAsync(session, operation, effectiveReadPreference, cancellationToken);
  263. }
  264. public override IAsyncCursor<TResult> Watch<TResult>(
  265. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  266. ChangeStreamOptions options = null,
  267. CancellationToken cancellationToken = default(CancellationToken))
  268. {
  269. return UsingImplicitSession(session => Watch(session, pipeline, options, cancellationToken), cancellationToken);
  270. }
  271. public override IAsyncCursor<TResult> Watch<TResult>(
  272. IClientSessionHandle session,
  273. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  274. ChangeStreamOptions options = null,
  275. CancellationToken cancellationToken = default(CancellationToken))
  276. {
  277. Ensure.IsNotNull(session, nameof(session));
  278. Ensure.IsNotNull(pipeline, nameof(pipeline));
  279. var operation = CreateChangeStreamOperation(pipeline, options);
  280. return ExecuteReadOperation(session, operation, cancellationToken);
  281. }
  282. public override Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  283. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  284. ChangeStreamOptions options = null,
  285. CancellationToken cancellationToken = default(CancellationToken))
  286. {
  287. return UsingImplicitSessionAsync(session => WatchAsync(session, pipeline, options, cancellationToken), cancellationToken);
  288. }
  289. public override Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  290. IClientSessionHandle session,
  291. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  292. ChangeStreamOptions options = null,
  293. CancellationToken cancellationToken = default(CancellationToken))
  294. {
  295. Ensure.IsNotNull(session, nameof(session));
  296. Ensure.IsNotNull(pipeline, nameof(pipeline));
  297. var operation = CreateChangeStreamOperation(pipeline, options);
  298. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  299. }
  300. public override IMongoDatabase WithReadConcern(ReadConcern readConcern)
  301. {
  302. Ensure.IsNotNull(readConcern, nameof(readConcern));
  303. var newSettings = _settings.Clone();
  304. newSettings.ReadConcern = readConcern;
  305. return new MongoDatabaseImpl(_client, _databaseNamespace, newSettings, _cluster, _operationExecutor);
  306. }
  307. public override IMongoDatabase WithReadPreference(ReadPreference readPreference)
  308. {
  309. Ensure.IsNotNull(readPreference, nameof(readPreference));
  310. var newSettings = _settings.Clone();
  311. newSettings.ReadPreference = readPreference;
  312. return new MongoDatabaseImpl(_client, _databaseNamespace, newSettings, _cluster, _operationExecutor);
  313. }
  314. public override IMongoDatabase WithWriteConcern(WriteConcern writeConcern)
  315. {
  316. Ensure.IsNotNull(writeConcern, nameof(writeConcern));
  317. var newSettings = _settings.Clone();
  318. newSettings.WriteConcern = writeConcern;
  319. return new MongoDatabaseImpl(_client, _databaseNamespace, newSettings, _cluster, _operationExecutor);
  320. }
  321. // private methods
  322. private void CreateCollectionHelper<TDocument>(IClientSessionHandle session, string name, CreateCollectionOptions<TDocument> options, CancellationToken cancellationToken)
  323. {
  324. options = options ?? new CreateCollectionOptions<TDocument>();
  325. var operation = CreateCreateCollectionOperation(name, options);
  326. ExecuteWriteOperation(session, operation, cancellationToken);
  327. }
  328. private Task CreateCollectionHelperAsync<TDocument>(IClientSessionHandle session, string name, CreateCollectionOptions<TDocument> options, CancellationToken cancellationToken)
  329. {
  330. options = options ?? new CreateCollectionOptions<TDocument>();
  331. var operation = CreateCreateCollectionOperation(name, options);
  332. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  333. }
  334. private CreateCollectionOperation CreateCreateCollectionOperation(string name, CreateCollectionOptions options)
  335. {
  336. options = options ?? new CreateCollectionOptions();
  337. var messageEncoderSettings = GetMessageEncoderSettings();
  338. #pragma warning disable 618
  339. return new CreateCollectionOperation(new CollectionNamespace(_databaseNamespace, name), messageEncoderSettings)
  340. {
  341. AutoIndexId = options.AutoIndexId,
  342. Collation = options.Collation,
  343. Capped = options.Capped,
  344. MaxDocuments = options.MaxDocuments,
  345. MaxSize = options.MaxSize,
  346. NoPadding = options.NoPadding,
  347. StorageEngine = options.StorageEngine,
  348. UsePowerOf2Sizes = options.UsePowerOf2Sizes,
  349. WriteConcern = _settings.WriteConcern
  350. };
  351. #pragma warning restore
  352. }
  353. private CreateCollectionOperation CreateCreateCollectionOperation<TDocument>(string name, CreateCollectionOptions<TDocument> options)
  354. {
  355. var messageEncoderSettings = GetMessageEncoderSettings();
  356. BsonDocument validator = null;
  357. if (options.Validator != null)
  358. {
  359. var serializerRegistry = options.SerializerRegistry ?? BsonSerializer.SerializerRegistry;
  360. var documentSerializer = options.DocumentSerializer ?? serializerRegistry.GetSerializer<TDocument>();
  361. validator = options.Validator.Render(documentSerializer, serializerRegistry);
  362. }
  363. #pragma warning disable 618
  364. return new CreateCollectionOperation(new CollectionNamespace(_databaseNamespace, name), messageEncoderSettings)
  365. {
  366. AutoIndexId = options.AutoIndexId,
  367. Capped = options.Capped,
  368. Collation = options.Collation,
  369. IndexOptionDefaults = options.IndexOptionDefaults?.ToBsonDocument(),
  370. MaxDocuments = options.MaxDocuments,
  371. MaxSize = options.MaxSize,
  372. NoPadding = options.NoPadding,
  373. StorageEngine = options.StorageEngine,
  374. UsePowerOf2Sizes = options.UsePowerOf2Sizes,
  375. ValidationAction = options.ValidationAction,
  376. ValidationLevel = options.ValidationLevel,
  377. Validator = validator,
  378. WriteConcern = _settings.WriteConcern
  379. };
  380. #pragma warning restore
  381. }
  382. private CreateViewOperation CreateCreateViewOperation<TDocument, TResult>(string viewName, string viewOn, PipelineDefinition<TDocument, TResult> pipeline, CreateViewOptions<TDocument> options)
  383. {
  384. var serializerRegistry = options.SerializerRegistry ?? BsonSerializer.SerializerRegistry;
  385. var documentSerializer = options.DocumentSerializer ?? serializerRegistry.GetSerializer<TDocument>();
  386. var pipelineDocuments = pipeline.Render(documentSerializer, serializerRegistry).Documents;
  387. return new CreateViewOperation(_databaseNamespace, viewName, viewOn, pipelineDocuments, GetMessageEncoderSettings())
  388. {
  389. Collation = options.Collation,
  390. WriteConcern = _settings.WriteConcern
  391. };
  392. }
  393. private DropCollectionOperation CreateDropCollectionOperation(string name)
  394. {
  395. var collectionNamespace = new CollectionNamespace(_databaseNamespace, name);
  396. var messageEncoderSettings = GetMessageEncoderSettings();
  397. return new DropCollectionOperation(collectionNamespace, messageEncoderSettings)
  398. {
  399. WriteConcern = _settings.WriteConcern
  400. };
  401. }
  402. private ListCollectionsOperation CreateListCollectionNamesOperation(ListCollectionNamesOptions options)
  403. {
  404. var messageEncoderSettings = GetMessageEncoderSettings();
  405. return new ListCollectionsOperation(_databaseNamespace, messageEncoderSettings)
  406. {
  407. Filter = options?.Filter?.Render(_settings.SerializerRegistry.GetSerializer<BsonDocument>(), _settings.SerializerRegistry),
  408. NameOnly = true
  409. };
  410. }
  411. private ListCollectionsOperation CreateListCollectionsOperation(ListCollectionsOptions options)
  412. {
  413. var messageEncoderSettings = GetMessageEncoderSettings();
  414. return new ListCollectionsOperation(_databaseNamespace, messageEncoderSettings)
  415. {
  416. Filter = options?.Filter?.Render(_settings.SerializerRegistry.GetSerializer<BsonDocument>(), _settings.SerializerRegistry)
  417. };
  418. }
  419. private IReadBinding CreateReadBinding(IClientSessionHandle session, ReadPreference readPreference)
  420. {
  421. if (session.IsInTransaction && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary)
  422. {
  423. throw new InvalidOperationException("Read preference in a transaction must be primary.");
  424. }
  425. var binding = new ReadPreferenceBinding(_cluster, readPreference, session.WrappedCoreSession.Fork());
  426. return new ReadBindingHandle(binding);
  427. }
  428. private IWriteBindingHandle CreateReadWriteBinding(IClientSessionHandle session)
  429. {
  430. var binding = new WritableServerBinding(_cluster, session.WrappedCoreSession.Fork());
  431. return new ReadWriteBindingHandle(binding);
  432. }
  433. private RenameCollectionOperation CreateRenameCollectionOperation(string oldName, string newName, RenameCollectionOptions options)
  434. {
  435. var messageEncoderSettings = GetMessageEncoderSettings();
  436. return new RenameCollectionOperation(
  437. new CollectionNamespace(_databaseNamespace, oldName),
  438. new CollectionNamespace(_databaseNamespace, newName),
  439. messageEncoderSettings)
  440. {
  441. DropTarget = options.DropTarget,
  442. WriteConcern = _settings.WriteConcern
  443. };
  444. }
  445. private ReadCommandOperation<TResult> CreateRunCommandOperation<TResult>(Command<TResult> command)
  446. {
  447. var renderedCommand = command.Render(_settings.SerializerRegistry);
  448. var messageEncoderSettings = GetMessageEncoderSettings();
  449. return new ReadCommandOperation<TResult>(_databaseNamespace, renderedCommand.Document, renderedCommand.ResultSerializer, messageEncoderSettings);
  450. }
  451. private ChangeStreamOperation<TResult> CreateChangeStreamOperation<TResult>(
  452. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  453. ChangeStreamOptions options)
  454. {
  455. return ChangeStreamHelper.CreateChangeStreamOperation(this, pipeline, options, _settings.ReadConcern, GetMessageEncoderSettings());
  456. }
  457. private IEnumerable<string> ExtractCollectionNames(IEnumerable<BsonDocument> collections)
  458. {
  459. return collections.Select(collection => collection["name"].AsString);
  460. }
  461. private T ExecuteReadOperation<T>(IClientSessionHandle session, IReadOperation<T> operation, CancellationToken cancellationToken)
  462. {
  463. var readPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, _settings.ReadPreference);
  464. return ExecuteReadOperation(session, operation, readPreference, cancellationToken);
  465. }
  466. private T ExecuteReadOperation<T>(IClientSessionHandle session, IReadOperation<T> operation, ReadPreference readPreference, CancellationToken cancellationToken)
  467. {
  468. using (var binding = CreateReadBinding(session, readPreference))
  469. {
  470. return _operationExecutor.ExecuteReadOperation(binding, operation, cancellationToken);
  471. }
  472. }
  473. private Task<T> ExecuteReadOperationAsync<T>(IClientSessionHandle session, IReadOperation<T> operation, CancellationToken cancellationToken)
  474. {
  475. var readPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, _settings.ReadPreference);
  476. return ExecuteReadOperationAsync(session, operation, readPreference, cancellationToken);
  477. }
  478. private async Task<T> ExecuteReadOperationAsync<T>(IClientSessionHandle session, IReadOperation<T> operation, ReadPreference readPreference, CancellationToken cancellationToken)
  479. {
  480. using (var binding = CreateReadBinding(session, readPreference))
  481. {
  482. return await _operationExecutor.ExecuteReadOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  483. }
  484. }
  485. private T ExecuteWriteOperation<T>(IClientSessionHandle session, IWriteOperation<T> operation, CancellationToken cancellationToken)
  486. {
  487. using (var binding = CreateReadWriteBinding(session))
  488. {
  489. return _operationExecutor.ExecuteWriteOperation(binding, operation, cancellationToken);
  490. }
  491. }
  492. private async Task<T> ExecuteWriteOperationAsync<T>(IClientSessionHandle session, IWriteOperation<T> operation, CancellationToken cancellationToken)
  493. {
  494. using (var binding = CreateReadWriteBinding(session))
  495. {
  496. return await _operationExecutor.ExecuteWriteOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  497. }
  498. }
  499. private MessageEncoderSettings GetMessageEncoderSettings()
  500. {
  501. return new MessageEncoderSettings
  502. {
  503. { MessageEncoderSettingsName.GuidRepresentation, _settings.GuidRepresentation },
  504. { MessageEncoderSettingsName.ReadEncoding, _settings.ReadEncoding ?? Utf8Encodings.Strict },
  505. { MessageEncoderSettingsName.WriteEncoding, _settings.WriteEncoding ?? Utf8Encodings.Strict }
  506. };
  507. }
  508. private void UsingImplicitSession(Action<IClientSessionHandle> func, CancellationToken cancellationToken)
  509. {
  510. using (var session = _operationExecutor.StartImplicitSession(cancellationToken))
  511. {
  512. func(session);
  513. }
  514. }
  515. private TResult UsingImplicitSession<TResult>(Func<IClientSessionHandle, TResult> func, CancellationToken cancellationToken)
  516. {
  517. using (var session = _operationExecutor.StartImplicitSession(cancellationToken))
  518. {
  519. return func(session);
  520. }
  521. }
  522. private async Task UsingImplicitSessionAsync(Func<IClientSessionHandle, Task> funcAsync, CancellationToken cancellationToken)
  523. {
  524. using (var session = await _operationExecutor.StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
  525. {
  526. await funcAsync(session).ConfigureAwait(false);
  527. }
  528. }
  529. private async Task<TResult> UsingImplicitSessionAsync<TResult>(Func<IClientSessionHandle, Task<TResult>> funcAsync, CancellationToken cancellationToken)
  530. {
  531. using (var session = await _operationExecutor.StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
  532. {
  533. return await funcAsync(session).ConfigureAwait(false);
  534. }
  535. }
  536. }
  537. }