MongoCollectionImpl.cs 76 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434
  1. /* Copyright 2010-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. using MongoDB.Bson;
  21. using MongoDB.Bson.IO;
  22. using MongoDB.Bson.Serialization;
  23. using MongoDB.Driver.Core.Bindings;
  24. using MongoDB.Driver.Core.Clusters;
  25. using MongoDB.Driver.Core.Misc;
  26. using MongoDB.Driver.Core.Operations;
  27. using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
  28. using MongoDB.Driver.Linq;
  29. namespace MongoDB.Driver
  30. {
  31. internal sealed class MongoCollectionImpl<TDocument> : MongoCollectionBase<TDocument>
  32. {
  33. // fields
  34. private readonly ICluster _cluster;
  35. private readonly CollectionNamespace _collectionNamespace;
  36. private readonly IMongoDatabase _database;
  37. private readonly MessageEncoderSettings _messageEncoderSettings;
  38. private readonly IOperationExecutor _operationExecutor;
  39. private readonly IBsonSerializer<TDocument> _documentSerializer;
  40. private readonly MongoCollectionSettings _settings;
  41. // constructors
  42. public MongoCollectionImpl(IMongoDatabase database, CollectionNamespace collectionNamespace, MongoCollectionSettings settings, ICluster cluster, IOperationExecutor operationExecutor)
  43. : this(database, collectionNamespace, settings, cluster, operationExecutor, Ensure.IsNotNull(settings, "settings").SerializerRegistry.GetSerializer<TDocument>())
  44. {
  45. }
  46. private MongoCollectionImpl(IMongoDatabase database, CollectionNamespace collectionNamespace, MongoCollectionSettings settings, ICluster cluster, IOperationExecutor operationExecutor, IBsonSerializer<TDocument> documentSerializer)
  47. {
  48. _database = Ensure.IsNotNull(database, nameof(database));
  49. _collectionNamespace = Ensure.IsNotNull(collectionNamespace, nameof(collectionNamespace));
  50. _settings = Ensure.IsNotNull(settings, nameof(settings)).Freeze();
  51. _cluster = Ensure.IsNotNull(cluster, nameof(cluster));
  52. _operationExecutor = Ensure.IsNotNull(operationExecutor, nameof(operationExecutor));
  53. _documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));
  54. _messageEncoderSettings = new MessageEncoderSettings
  55. {
  56. { MessageEncoderSettingsName.GuidRepresentation, _settings.GuidRepresentation },
  57. { MessageEncoderSettingsName.ReadEncoding, _settings.ReadEncoding ?? Utf8Encodings.Strict },
  58. { MessageEncoderSettingsName.WriteEncoding, _settings.WriteEncoding ?? Utf8Encodings.Strict }
  59. };
  60. }
  61. // properties
  62. public override CollectionNamespace CollectionNamespace
  63. {
  64. get { return _collectionNamespace; }
  65. }
  66. public override IMongoDatabase Database
  67. {
  68. get { return _database; }
  69. }
  70. public override IBsonSerializer<TDocument> DocumentSerializer
  71. {
  72. get { return _documentSerializer; }
  73. }
  74. public override IMongoIndexManager<TDocument> Indexes
  75. {
  76. get { return new MongoIndexManager(this); }
  77. }
  78. public override MongoCollectionSettings Settings
  79. {
  80. get { return _settings; }
  81. }
  82. // public methods
  83. public override IAsyncCursor<TResult> Aggregate<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
  84. {
  85. return UsingImplicitSession(session => Aggregate(session, pipeline, options, cancellationToken), cancellationToken);
  86. }
  87. public override IAsyncCursor<TResult> Aggregate<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
  88. {
  89. Ensure.IsNotNull(session, nameof(session));
  90. var renderedPipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).Render(_documentSerializer, _settings.SerializerRegistry);
  91. options = options ?? new AggregateOptions();
  92. var last = renderedPipeline.Documents.LastOrDefault();
  93. if (last != null && last.GetElement(0).Name == "$out")
  94. {
  95. var aggregateOperation = CreateAggregateToCollectionOperation(renderedPipeline, options);
  96. ExecuteWriteOperation(session, aggregateOperation, cancellationToken);
  97. // we want to delay execution of the find because the user may
  98. // not want to iterate the results at all...
  99. var findOperation = CreateAggregateToCollectionFindOperation(last, renderedPipeline.OutputSerializer, options);
  100. var forkedSession = session.Fork();
  101. var deferredCursor = new DeferredAsyncCursor<TResult>(
  102. () => forkedSession.Dispose(),
  103. ct => ExecuteReadOperation(forkedSession, findOperation, ReadPreference.Primary, ct),
  104. ct => ExecuteReadOperationAsync(forkedSession, findOperation, ReadPreference.Primary, ct));
  105. return deferredCursor;
  106. }
  107. else
  108. {
  109. var aggregateOperation = CreateAggregateOperation(renderedPipeline, options);
  110. return ExecuteReadOperation(session, aggregateOperation, cancellationToken);
  111. }
  112. }
  113. public override Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
  114. {
  115. return UsingImplicitSessionAsync(session => AggregateAsync(session, pipeline, options, cancellationToken), cancellationToken);
  116. }
  117. public override async Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
  118. {
  119. Ensure.IsNotNull(session, nameof(session));
  120. var renderedPipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).Render(_documentSerializer, _settings.SerializerRegistry);
  121. options = options ?? new AggregateOptions();
  122. var last = renderedPipeline.Documents.LastOrDefault();
  123. if (last != null && last.GetElement(0).Name == "$out")
  124. {
  125. var aggregateOperation = CreateAggregateToCollectionOperation(renderedPipeline, options);
  126. await ExecuteWriteOperationAsync(session, aggregateOperation, cancellationToken).ConfigureAwait(false);
  127. // we want to delay execution of the find because the user may
  128. // not want to iterate the results at all...
  129. var findOperation = CreateAggregateToCollectionFindOperation(last, renderedPipeline.OutputSerializer, options);
  130. var forkedSession = session.Fork();
  131. var deferredCursor = new DeferredAsyncCursor<TResult>(
  132. () => forkedSession.Dispose(),
  133. ct => ExecuteReadOperation(forkedSession, findOperation, ReadPreference.Primary, ct),
  134. ct => ExecuteReadOperationAsync(forkedSession, findOperation, ReadPreference.Primary, ct));
  135. return await Task.FromResult<IAsyncCursor<TResult>>(deferredCursor).ConfigureAwait(false);
  136. }
  137. else
  138. {
  139. var aggregateOperation = CreateAggregateOperation(renderedPipeline, options);
  140. return await ExecuteReadOperationAsync(session, aggregateOperation, cancellationToken).ConfigureAwait(false);
  141. }
  142. }
  143. public override BulkWriteResult<TDocument> BulkWrite(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  144. {
  145. return UsingImplicitSession(session => BulkWrite(session, requests, options, cancellationToken), cancellationToken);
  146. }
  147. public override BulkWriteResult<TDocument> BulkWrite(IClientSessionHandle session, IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  148. {
  149. Ensure.IsNotNull(session, nameof(session));
  150. Ensure.IsNotNull(requests, nameof(requests));
  151. if (!requests.Any())
  152. {
  153. throw new ArgumentException("Must contain at least 1 request.", "requests");
  154. }
  155. options = options ?? new BulkWriteOptions();
  156. var operation = CreateBulkWriteOperation(requests, options);
  157. try
  158. {
  159. var result = ExecuteWriteOperation(session, operation, cancellationToken);
  160. return BulkWriteResult<TDocument>.FromCore(result, requests);
  161. }
  162. catch (MongoBulkWriteOperationException ex)
  163. {
  164. throw MongoBulkWriteException<TDocument>.FromCore(ex, requests.ToList());
  165. }
  166. }
  167. public override Task<BulkWriteResult<TDocument>> BulkWriteAsync(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  168. {
  169. return UsingImplicitSessionAsync(session => BulkWriteAsync(session, requests, options, cancellationToken), cancellationToken);
  170. }
  171. public override async Task<BulkWriteResult<TDocument>> BulkWriteAsync(IClientSessionHandle session, IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  172. {
  173. Ensure.IsNotNull(session, nameof(session));
  174. Ensure.IsNotNull(requests, nameof(requests));
  175. if (!requests.Any())
  176. {
  177. throw new ArgumentException("Must contain at least 1 request.", "requests");
  178. }
  179. options = options ?? new BulkWriteOptions();
  180. var operation = CreateBulkWriteOperation(requests, options);
  181. try
  182. {
  183. var result = await ExecuteWriteOperationAsync(session, operation, cancellationToken).ConfigureAwait(false);
  184. return BulkWriteResult<TDocument>.FromCore(result, requests);
  185. }
  186. catch (MongoBulkWriteOperationException ex)
  187. {
  188. throw MongoBulkWriteException<TDocument>.FromCore(ex, requests.ToList());
  189. }
  190. }
  191. [Obsolete("Use CountDocuments or EstimatedDocumentCount instead.")]
  192. public override long Count(FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  193. {
  194. return UsingImplicitSession(session => Count(session, filter, options, cancellationToken), cancellationToken);
  195. }
  196. [Obsolete("Use CountDocuments or EstimatedDocumentCount instead.")]
  197. public override long Count(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  198. {
  199. Ensure.IsNotNull(session, nameof(session));
  200. Ensure.IsNotNull(filter, nameof(filter));
  201. options = options ?? new CountOptions();
  202. var operation = CreateCountOperation(filter, options);
  203. return ExecuteReadOperation(session, operation, cancellationToken);
  204. }
  205. [Obsolete("Use CountDocumentsAsync or EstimatedDocumentCountAsync instead.")]
  206. public override Task<long> CountAsync(FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  207. {
  208. return UsingImplicitSessionAsync(session => CountAsync(session, filter, options, cancellationToken), cancellationToken);
  209. }
  210. [Obsolete("Use CountDocumentsAsync or EstimatedDocumentCountAsync instead.")]
  211. public override Task<long> CountAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  212. {
  213. Ensure.IsNotNull(session, nameof(session));
  214. Ensure.IsNotNull(filter, nameof(filter));
  215. options = options ?? new CountOptions();
  216. var operation = CreateCountOperation(filter, options);
  217. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  218. }
  219. public override long CountDocuments(FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  220. {
  221. return UsingImplicitSession(session => CountDocuments(session, filter, options, cancellationToken), cancellationToken);
  222. }
  223. public override long CountDocuments(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  224. {
  225. Ensure.IsNotNull(session, nameof(session));
  226. Ensure.IsNotNull(filter, nameof(filter));
  227. options = options ?? new CountOptions();
  228. var operation = CreateCountDocumentsOperation(filter, options);
  229. return ExecuteReadOperation(session, operation, cancellationToken);
  230. }
  231. public override Task<long> CountDocumentsAsync(FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  232. {
  233. return UsingImplicitSessionAsync(session => CountDocumentsAsync(session, filter, options, cancellationToken), cancellationToken);
  234. }
  235. public override Task<long> CountDocumentsAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  236. {
  237. Ensure.IsNotNull(session, nameof(session));
  238. Ensure.IsNotNull(filter, nameof(filter));
  239. options = options ?? new CountOptions();
  240. var operation = CreateCountDocumentsOperation(filter, options);
  241. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  242. }
  243. public override IAsyncCursor<TField> Distinct<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options, CancellationToken cancellationToken = default(CancellationToken))
  244. {
  245. return UsingImplicitSession(session => Distinct(session, field, filter, options, cancellationToken), cancellationToken);
  246. }
  247. public override IAsyncCursor<TField> Distinct<TField>(IClientSessionHandle session, FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options, CancellationToken cancellationToken = default(CancellationToken))
  248. {
  249. Ensure.IsNotNull(session, nameof(session));
  250. Ensure.IsNotNull(field, nameof(field));
  251. Ensure.IsNotNull(filter, nameof(filter));
  252. options = options ?? new DistinctOptions();
  253. var operation = CreateDistinctOperation(field, filter, options);
  254. return ExecuteReadOperation(session, operation, cancellationToken);
  255. }
  256. public override Task<IAsyncCursor<TField>> DistinctAsync<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options, CancellationToken cancellationToken = default(CancellationToken))
  257. {
  258. return UsingImplicitSessionAsync(session => DistinctAsync(session, field, filter, options, cancellationToken), cancellationToken);
  259. }
  260. public override Task<IAsyncCursor<TField>> DistinctAsync<TField>(IClientSessionHandle session, FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options, CancellationToken cancellationToken = default(CancellationToken))
  261. {
  262. Ensure.IsNotNull(session, nameof(session));
  263. Ensure.IsNotNull(field, nameof(field));
  264. Ensure.IsNotNull(filter, nameof(filter));
  265. options = options ?? new DistinctOptions();
  266. var operation = CreateDistinctOperation(field, filter, options);
  267. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  268. }
  269. public override long EstimatedDocumentCount(EstimatedDocumentCountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  270. {
  271. return UsingImplicitSession(session =>
  272. {
  273. var operation = CreateEstimatedDocumentCountOperation(options);
  274. return ExecuteReadOperation(session, operation, cancellationToken);
  275. });
  276. }
  277. public override Task<long> EstimatedDocumentCountAsync(EstimatedDocumentCountOptions options, CancellationToken cancellationToken = default(CancellationToken))
  278. {
  279. return UsingImplicitSessionAsync(session =>
  280. {
  281. var operation = CreateEstimatedDocumentCountOperation(options);
  282. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  283. });
  284. }
  285. public override IAsyncCursor<TProjection> FindSync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  286. {
  287. return UsingImplicitSession(session => FindSync(session, filter, options, cancellationToken), cancellationToken);
  288. }
  289. public override IAsyncCursor<TProjection> FindSync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  290. {
  291. Ensure.IsNotNull(session, nameof(session));
  292. Ensure.IsNotNull(filter, nameof(filter));
  293. options = options ?? new FindOptions<TDocument, TProjection>();
  294. var operation = CreateFindOperation<TProjection>(filter, options);
  295. return ExecuteReadOperation(session, operation, cancellationToken);
  296. }
  297. public override Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  298. {
  299. return UsingImplicitSessionAsync(session => FindAsync(session, filter, options, cancellationToken), cancellationToken);
  300. }
  301. public override Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  302. {
  303. Ensure.IsNotNull(session, nameof(session));
  304. Ensure.IsNotNull(filter, nameof(filter));
  305. options = options ?? new FindOptions<TDocument, TProjection>();
  306. var operation = CreateFindOperation<TProjection>(filter, options);
  307. return ExecuteReadOperationAsync(session, operation, cancellationToken);
  308. }
  309. public override TProjection FindOneAndDelete<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  310. {
  311. return UsingImplicitSession(session => FindOneAndDelete(session, filter, options, cancellationToken), cancellationToken);
  312. }
  313. public override TProjection FindOneAndDelete<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  314. {
  315. Ensure.IsNotNull(session, nameof(session));
  316. Ensure.IsNotNull(filter, nameof(filter));
  317. options = options ?? new FindOneAndDeleteOptions<TDocument, TProjection>();
  318. var operation = CreateFindOneAndDeleteOperation<TProjection>(filter, options);
  319. return ExecuteWriteOperation(session, operation, cancellationToken);
  320. }
  321. public override Task<TProjection> FindOneAndDeleteAsync<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  322. {
  323. return UsingImplicitSessionAsync(session => FindOneAndDeleteAsync(session, filter, options, cancellationToken), cancellationToken);
  324. }
  325. public override Task<TProjection> FindOneAndDeleteAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  326. {
  327. Ensure.IsNotNull(session, nameof(session));
  328. Ensure.IsNotNull(filter, nameof(filter));
  329. options = options ?? new FindOneAndDeleteOptions<TDocument, TProjection>();
  330. var operation = CreateFindOneAndDeleteOperation<TProjection>(filter, options);
  331. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  332. }
  333. public override TProjection FindOneAndReplace<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  334. {
  335. return UsingImplicitSession(session => FindOneAndReplace(session, filter, replacement, options, cancellationToken), cancellationToken);
  336. }
  337. public override TProjection FindOneAndReplace<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  338. {
  339. Ensure.IsNotNull(session, nameof(session));
  340. Ensure.IsNotNull(filter, nameof(filter));
  341. var replacementObject = Ensure.IsNotNull((object)replacement, nameof(replacement)); // only box once if it's a struct
  342. options = options ?? new FindOneAndReplaceOptions<TDocument, TProjection>();
  343. var operation = CreateFindOneAndReplaceOperation(filter, replacementObject, options);
  344. return ExecuteWriteOperation(session, operation, cancellationToken);
  345. }
  346. public override Task<TProjection> FindOneAndReplaceAsync<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  347. {
  348. return UsingImplicitSessionAsync(session => FindOneAndReplaceAsync(session, filter, replacement, options, cancellationToken), cancellationToken);
  349. }
  350. public override Task<TProjection> FindOneAndReplaceAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  351. {
  352. Ensure.IsNotNull(session, nameof(session));
  353. Ensure.IsNotNull(filter, nameof(filter));
  354. var replacementObject = Ensure.IsNotNull((object)replacement, nameof(replacement)); // only box once if it's a struct
  355. options = options ?? new FindOneAndReplaceOptions<TDocument, TProjection>();
  356. var operation = CreateFindOneAndReplaceOperation(filter, replacementObject, options);
  357. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  358. }
  359. public override TProjection FindOneAndUpdate<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  360. {
  361. return UsingImplicitSession(session => FindOneAndUpdate(session, filter, update, options, cancellationToken), cancellationToken);
  362. }
  363. public override TProjection FindOneAndUpdate<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  364. {
  365. Ensure.IsNotNull(session, nameof(session));
  366. Ensure.IsNotNull(filter, nameof(filter));
  367. Ensure.IsNotNull(update, nameof(update));
  368. options = options ?? new FindOneAndUpdateOptions<TDocument, TProjection>();
  369. var operation = CreateFindOneAndUpdateOperation(filter, update, options);
  370. return ExecuteWriteOperation(session, operation, cancellationToken);
  371. }
  372. public override Task<TProjection> FindOneAndUpdateAsync<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  373. {
  374. return UsingImplicitSessionAsync(session => FindOneAndUpdateAsync(session, filter, update, options, cancellationToken), cancellationToken);
  375. }
  376. public override Task<TProjection> FindOneAndUpdateAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options, CancellationToken cancellationToken = default(CancellationToken))
  377. {
  378. Ensure.IsNotNull(session, nameof(session));
  379. Ensure.IsNotNull(filter, nameof(filter));
  380. Ensure.IsNotNull(update, nameof(update));
  381. options = options ?? new FindOneAndUpdateOptions<TDocument, TProjection>();
  382. var operation = CreateFindOneAndUpdateOperation(filter, update, options);
  383. return ExecuteWriteOperationAsync(session, operation, cancellationToken);
  384. }
  385. public override IAsyncCursor<TResult> MapReduce<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  386. {
  387. return UsingImplicitSession(session => MapReduce(session, map, reduce, options, cancellationToken), cancellationToken);
  388. }
  389. public override IAsyncCursor<TResult> MapReduce<TResult>(IClientSessionHandle session, BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  390. {
  391. Ensure.IsNotNull(session, nameof(session));
  392. Ensure.IsNotNull(map, nameof(map));
  393. Ensure.IsNotNull(reduce, nameof(reduce));
  394. options = options ?? new MapReduceOptions<TDocument, TResult>();
  395. var outputOptions = options.OutputOptions ?? MapReduceOutputOptions.Inline;
  396. var resultSerializer = ResolveResultSerializer<TResult>(options.ResultSerializer);
  397. if (outputOptions == MapReduceOutputOptions.Inline)
  398. {
  399. var operation = CreateMapReduceOperation(map, reduce, options, resultSerializer);
  400. return ExecuteReadOperation(session, operation, cancellationToken);
  401. }
  402. else
  403. {
  404. var mapReduceOperation = CreateMapReduceOutputToCollectionOperation(map, reduce, options, outputOptions);
  405. ExecuteWriteOperation(session, mapReduceOperation, cancellationToken);
  406. // we want to delay execution of the find because the user may
  407. // not want to iterate the results at all...
  408. var findOperation = CreateMapReduceOutputToCollectionFindOperation<TResult>(options, mapReduceOperation.OutputCollectionNamespace, resultSerializer);
  409. var forkedSession = session.Fork();
  410. var deferredCursor = new DeferredAsyncCursor<TResult>(
  411. () => forkedSession.Dispose(),
  412. ct => ExecuteReadOperation(forkedSession, findOperation, ReadPreference.Primary, ct),
  413. ct => ExecuteReadOperationAsync(forkedSession, findOperation, ReadPreference.Primary, ct));
  414. return deferredCursor;
  415. }
  416. }
  417. public override Task<IAsyncCursor<TResult>> MapReduceAsync<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  418. {
  419. return UsingImplicitSessionAsync(session => MapReduceAsync(session, map, reduce, options, cancellationToken), cancellationToken);
  420. }
  421. public override async Task<IAsyncCursor<TResult>> MapReduceAsync<TResult>(IClientSessionHandle session, BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  422. {
  423. Ensure.IsNotNull(session, nameof(session));
  424. Ensure.IsNotNull(map, nameof(map));
  425. Ensure.IsNotNull(reduce, nameof(reduce));
  426. options = options ?? new MapReduceOptions<TDocument, TResult>();
  427. var outputOptions = options.OutputOptions ?? MapReduceOutputOptions.Inline;
  428. var resultSerializer = ResolveResultSerializer<TResult>(options.ResultSerializer);
  429. if (outputOptions == MapReduceOutputOptions.Inline)
  430. {
  431. var operation = CreateMapReduceOperation(map, reduce, options, resultSerializer);
  432. return await ExecuteReadOperationAsync(session, operation, cancellationToken).ConfigureAwait(false);
  433. }
  434. else
  435. {
  436. var mapReduceOperation = CreateMapReduceOutputToCollectionOperation(map, reduce, options, outputOptions);
  437. await ExecuteWriteOperationAsync(session, mapReduceOperation, cancellationToken).ConfigureAwait(false);
  438. // we want to delay execution of the find because the user may
  439. // not want to iterate the results at all...
  440. var findOperation = CreateMapReduceOutputToCollectionFindOperation<TResult>(options, mapReduceOperation.OutputCollectionNamespace, resultSerializer);
  441. var forkedSession = session.Fork();
  442. var deferredCursor = new DeferredAsyncCursor<TResult>(
  443. () => forkedSession.Dispose(),
  444. ct => ExecuteReadOperation(forkedSession, findOperation, ReadPreference.Primary, ct),
  445. ct => ExecuteReadOperationAsync(forkedSession, findOperation, ReadPreference.Primary, ct));
  446. return await Task.FromResult(deferredCursor).ConfigureAwait(false);
  447. }
  448. }
  449. public override IFilteredMongoCollection<TDerivedDocument> OfType<TDerivedDocument>()
  450. {
  451. var derivedDocumentSerializer = _settings.SerializerRegistry.GetSerializer<TDerivedDocument>();
  452. var ofTypeSerializer = new OfTypeSerializer<TDocument, TDerivedDocument>(derivedDocumentSerializer);
  453. var derivedDocumentCollection = new MongoCollectionImpl<TDerivedDocument>(_database, _collectionNamespace, _settings, _cluster, _operationExecutor, ofTypeSerializer);
  454. var rootOfTypeFilter = Builders<TDocument>.Filter.OfType<TDerivedDocument>();
  455. var renderedOfTypeFilter = rootOfTypeFilter.Render(_documentSerializer, _settings.SerializerRegistry);
  456. var ofTypeFilter = new BsonDocumentFilterDefinition<TDerivedDocument>(renderedOfTypeFilter);
  457. return new OfTypeMongoCollection<TDocument, TDerivedDocument>(this, derivedDocumentCollection, ofTypeFilter);
  458. }
  459. public override IAsyncCursor<TResult> Watch<TResult>(
  460. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  461. ChangeStreamOptions options = null,
  462. CancellationToken cancellationToken = default(CancellationToken))
  463. {
  464. return UsingImplicitSession(session => Watch(session, pipeline, options, cancellationToken), cancellationToken);
  465. }
  466. public override IAsyncCursor<TResult> Watch<TResult>(
  467. IClientSessionHandle session,
  468. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  469. ChangeStreamOptions options = null,
  470. CancellationToken cancellationToken = default(CancellationToken))
  471. {
  472. Ensure.IsNotNull(session, nameof(session));
  473. Ensure.IsNotNull(pipeline, nameof(pipeline));
  474. var operation = CreateChangeStreamOperation(pipeline, options);
  475. return ExecuteReadOperation(session, operation, cancellationToken);
  476. }
  477. public override Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  478. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  479. ChangeStreamOptions options = null,
  480. CancellationToken cancellationToken = default(CancellationToken))
  481. {
  482. return UsingImplicitSessionAsync(session => WatchAsync(session, pipeline, options, cancellationToken), cancellationToken);
  483. }
  484. public override async Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  485. IClientSessionHandle session,
  486. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  487. ChangeStreamOptions options = null,
  488. CancellationToken cancellationToken = default(CancellationToken))
  489. {
  490. Ensure.IsNotNull(session, nameof(session));
  491. Ensure.IsNotNull(pipeline, nameof(pipeline));
  492. var operation = CreateChangeStreamOperation(pipeline, options);
  493. return await ExecuteReadOperationAsync(session, operation, cancellationToken).ConfigureAwait(false);
  494. }
  495. public override IMongoCollection<TDocument> WithReadConcern(ReadConcern readConcern)
  496. {
  497. var newSettings = _settings.Clone();
  498. newSettings.ReadConcern = readConcern;
  499. return new MongoCollectionImpl<TDocument>(_database, _collectionNamespace, newSettings, _cluster, _operationExecutor);
  500. }
  501. public override IMongoCollection<TDocument> WithReadPreference(ReadPreference readPreference)
  502. {
  503. var newSettings = _settings.Clone();
  504. newSettings.ReadPreference = readPreference;
  505. return new MongoCollectionImpl<TDocument>(_database, _collectionNamespace, newSettings, _cluster, _operationExecutor);
  506. }
  507. public override IMongoCollection<TDocument> WithWriteConcern(WriteConcern writeConcern)
  508. {
  509. var newSettings = _settings.Clone();
  510. newSettings.WriteConcern = writeConcern;
  511. return new MongoCollectionImpl<TDocument>(_database, _collectionNamespace, newSettings, _cluster, _operationExecutor);
  512. }
  513. // private methods
  514. private void AssignId(TDocument document)
  515. {
  516. var idProvider = _documentSerializer as IBsonIdProvider;
  517. if (idProvider != null)
  518. {
  519. object id;
  520. Type idNominalType;
  521. IIdGenerator idGenerator;
  522. if (idProvider.GetDocumentId(document, out id, out idNominalType, out idGenerator))
  523. {
  524. if (idGenerator != null && idGenerator.IsEmpty(id))
  525. {
  526. id = idGenerator.GenerateId(this, document);
  527. idProvider.SetDocumentId(document, id);
  528. }
  529. }
  530. }
  531. }
  532. private WriteRequest ConvertWriteModelToWriteRequest(WriteModel<TDocument> model, int index)
  533. {
  534. switch (model.ModelType)
  535. {
  536. case WriteModelType.InsertOne:
  537. var insertOneModel = (InsertOneModel<TDocument>)model;
  538. if (_settings.AssignIdOnInsert)
  539. {
  540. AssignId(insertOneModel.Document);
  541. }
  542. return new InsertRequest(new BsonDocumentWrapper(insertOneModel.Document, _documentSerializer))
  543. {
  544. CorrelationId = index
  545. };
  546. case WriteModelType.DeleteMany:
  547. var deleteManyModel = (DeleteManyModel<TDocument>)model;
  548. return new DeleteRequest(deleteManyModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry))
  549. {
  550. CorrelationId = index,
  551. Collation = deleteManyModel.Collation,
  552. Limit = 0
  553. };
  554. case WriteModelType.DeleteOne:
  555. var deleteOneModel = (DeleteOneModel<TDocument>)model;
  556. return new DeleteRequest(deleteOneModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry))
  557. {
  558. CorrelationId = index,
  559. Collation = deleteOneModel.Collation,
  560. Limit = 1
  561. };
  562. case WriteModelType.ReplaceOne:
  563. var replaceOneModel = (ReplaceOneModel<TDocument>)model;
  564. return new UpdateRequest(
  565. UpdateType.Replacement,
  566. replaceOneModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  567. new BsonDocumentWrapper(replaceOneModel.Replacement, _documentSerializer))
  568. {
  569. Collation = replaceOneModel.Collation,
  570. CorrelationId = index,
  571. IsMulti = false,
  572. IsUpsert = replaceOneModel.IsUpsert
  573. };
  574. case WriteModelType.UpdateMany:
  575. var updateManyModel = (UpdateManyModel<TDocument>)model;
  576. return new UpdateRequest(
  577. UpdateType.Update,
  578. updateManyModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  579. updateManyModel.Update.Render(_documentSerializer, _settings.SerializerRegistry))
  580. {
  581. ArrayFilters = RenderArrayFilters(updateManyModel.ArrayFilters),
  582. Collation = updateManyModel.Collation,
  583. CorrelationId = index,
  584. IsMulti = true,
  585. IsUpsert = updateManyModel.IsUpsert
  586. };
  587. case WriteModelType.UpdateOne:
  588. var updateOneModel = (UpdateOneModel<TDocument>)model;
  589. return new UpdateRequest(
  590. UpdateType.Update,
  591. updateOneModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  592. updateOneModel.Update.Render(_documentSerializer, _settings.SerializerRegistry))
  593. {
  594. ArrayFilters = RenderArrayFilters(updateOneModel.ArrayFilters),
  595. Collation = updateOneModel.Collation,
  596. CorrelationId = index,
  597. IsMulti = false,
  598. IsUpsert = updateOneModel.IsUpsert
  599. };
  600. default:
  601. throw new InvalidOperationException("Unknown type of WriteModel provided.");
  602. }
  603. }
  604. private AggregateOperation<TResult> CreateAggregateOperation<TResult>(RenderedPipelineDefinition<TResult> renderedPipeline, AggregateOptions options)
  605. {
  606. return new AggregateOperation<TResult>(
  607. _collectionNamespace,
  608. renderedPipeline.Documents,
  609. renderedPipeline.OutputSerializer,
  610. _messageEncoderSettings)
  611. {
  612. AllowDiskUse = options.AllowDiskUse,
  613. BatchSize = options.BatchSize,
  614. Collation = options.Collation,
  615. Comment = options.Comment,
  616. Hint = options.Hint,
  617. MaxAwaitTime = options.MaxAwaitTime,
  618. MaxTime = options.MaxTime,
  619. ReadConcern = _settings.ReadConcern,
  620. UseCursor = options.UseCursor
  621. };
  622. }
  623. private FindOperation<TResult> CreateAggregateToCollectionFindOperation<TResult>(BsonDocument outStage, IBsonSerializer<TResult> resultSerializer, AggregateOptions options)
  624. {
  625. var outputCollectionName = outStage.GetElement(0).Value.AsString;
  626. return new FindOperation<TResult>(
  627. new CollectionNamespace(_collectionNamespace.DatabaseNamespace, outputCollectionName),
  628. resultSerializer,
  629. _messageEncoderSettings)
  630. {
  631. BatchSize = options.BatchSize,
  632. Collation = options.Collation,
  633. MaxTime = options.MaxTime,
  634. ReadConcern = _settings.ReadConcern
  635. };
  636. }
  637. private AggregateToCollectionOperation CreateAggregateToCollectionOperation<TResult>(RenderedPipelineDefinition<TResult> renderedPipeline, AggregateOptions options)
  638. {
  639. return new AggregateToCollectionOperation(
  640. _collectionNamespace,
  641. renderedPipeline.Documents,
  642. _messageEncoderSettings)
  643. {
  644. AllowDiskUse = options.AllowDiskUse,
  645. BypassDocumentValidation = options.BypassDocumentValidation,
  646. Collation = options.Collation,
  647. Comment = options.Comment,
  648. Hint = options.Hint,
  649. MaxTime = options.MaxTime,
  650. WriteConcern = _settings.WriteConcern
  651. };
  652. }
  653. private BulkMixedWriteOperation CreateBulkWriteOperation(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options)
  654. {
  655. return new BulkMixedWriteOperation(
  656. _collectionNamespace,
  657. requests.Select(ConvertWriteModelToWriteRequest),
  658. _messageEncoderSettings)
  659. {
  660. BypassDocumentValidation = options.BypassDocumentValidation,
  661. IsOrdered = options.IsOrdered,
  662. RetryRequested = _database.Client.Settings.RetryWrites,
  663. WriteConcern = _settings.WriteConcern
  664. };
  665. }
  666. private ChangeStreamOperation<TResult> CreateChangeStreamOperation<TResult>(
  667. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  668. ChangeStreamOptions options)
  669. {
  670. return ChangeStreamHelper.CreateChangeStreamOperation(this, pipeline, _documentSerializer, options, _settings.ReadConcern, _messageEncoderSettings);
  671. }
  672. private CountDocumentsOperation CreateCountDocumentsOperation(FilterDefinition<TDocument> filter, CountOptions options)
  673. {
  674. return new CountDocumentsOperation(_collectionNamespace, _messageEncoderSettings)
  675. {
  676. Collation = options.Collation,
  677. Filter = filter.Render(_documentSerializer, _settings.SerializerRegistry),
  678. Hint = options.Hint,
  679. Limit = options.Limit,
  680. MaxTime = options.MaxTime,
  681. ReadConcern = _settings.ReadConcern,
  682. Skip = options.Skip
  683. };
  684. }
  685. private CountOperation CreateCountOperation(FilterDefinition<TDocument> filter, CountOptions options)
  686. {
  687. return new CountOperation(_collectionNamespace, _messageEncoderSettings)
  688. {
  689. Collation = options.Collation,
  690. Filter = filter.Render(_documentSerializer, _settings.SerializerRegistry),
  691. Hint = options.Hint,
  692. Limit = options.Limit,
  693. MaxTime = options.MaxTime,
  694. ReadConcern = _settings.ReadConcern,
  695. Skip = options.Skip
  696. };
  697. }
  698. private DistinctOperation<TField> CreateDistinctOperation<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options)
  699. {
  700. var renderedField = field.Render(_documentSerializer, _settings.SerializerRegistry);
  701. var valueSerializer = GetValueSerializerForDistinct(renderedField, _settings.SerializerRegistry);
  702. return new DistinctOperation<TField>(
  703. _collectionNamespace,
  704. valueSerializer,
  705. renderedField.FieldName,
  706. _messageEncoderSettings)
  707. {
  708. Collation = options.Collation,
  709. Filter = filter.Render(_documentSerializer, _settings.SerializerRegistry),
  710. MaxTime = options.MaxTime,
  711. ReadConcern = _settings.ReadConcern
  712. };
  713. }
  714. private CountOperation CreateEstimatedDocumentCountOperation(EstimatedDocumentCountOptions options)
  715. {
  716. return new CountOperation(_collectionNamespace, _messageEncoderSettings)
  717. {
  718. MaxTime = options?.MaxTime
  719. };
  720. }
  721. private FindOneAndDeleteOperation<TProjection> CreateFindOneAndDeleteOperation<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options)
  722. {
  723. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  724. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  725. return new FindOneAndDeleteOperation<TProjection>(
  726. _collectionNamespace,
  727. filter.Render(_documentSerializer, _settings.SerializerRegistry),
  728. new FindAndModifyValueDeserializer<TProjection>(renderedProjection.ProjectionSerializer),
  729. _messageEncoderSettings)
  730. {
  731. Collation = options.Collation,
  732. MaxTime = options.MaxTime,
  733. Projection = renderedProjection.Document,
  734. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  735. WriteConcern = _settings.WriteConcern,
  736. RetryRequested = _database.Client.Settings.RetryWrites
  737. };
  738. }
  739. private FindOneAndReplaceOperation<TProjection> CreateFindOneAndReplaceOperation<TProjection>(FilterDefinition<TDocument> filter, object replacementObject, FindOneAndReplaceOptions<TDocument, TProjection> options)
  740. {
  741. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  742. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  743. return new FindOneAndReplaceOperation<TProjection>(
  744. _collectionNamespace,
  745. filter.Render(_documentSerializer, _settings.SerializerRegistry),
  746. new BsonDocumentWrapper(replacementObject, _documentSerializer),
  747. new FindAndModifyValueDeserializer<TProjection>(renderedProjection.ProjectionSerializer),
  748. _messageEncoderSettings)
  749. {
  750. BypassDocumentValidation = options.BypassDocumentValidation,
  751. Collation = options.Collation,
  752. IsUpsert = options.IsUpsert,
  753. MaxTime = options.MaxTime,
  754. Projection = renderedProjection.Document,
  755. ReturnDocument = options.ReturnDocument.ToCore(),
  756. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  757. WriteConcern = _settings.WriteConcern,
  758. RetryRequested = _database.Client.Settings.RetryWrites
  759. };
  760. }
  761. private FindOneAndUpdateOperation<TProjection> CreateFindOneAndUpdateOperation<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options)
  762. {
  763. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  764. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  765. return new FindOneAndUpdateOperation<TProjection>(
  766. _collectionNamespace,
  767. filter.Render(_documentSerializer, _settings.SerializerRegistry),
  768. update.Render(_documentSerializer, _settings.SerializerRegistry),
  769. new FindAndModifyValueDeserializer<TProjection>(renderedProjection.ProjectionSerializer),
  770. _messageEncoderSettings)
  771. {
  772. ArrayFilters = RenderArrayFilters(options.ArrayFilters),
  773. BypassDocumentValidation = options.BypassDocumentValidation,
  774. Collation = options.Collation,
  775. IsUpsert = options.IsUpsert,
  776. MaxTime = options.MaxTime,
  777. Projection = renderedProjection.Document,
  778. ReturnDocument = options.ReturnDocument.ToCore(),
  779. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  780. WriteConcern = _settings.WriteConcern,
  781. RetryRequested = _database.Client.Settings.RetryWrites
  782. };
  783. }
  784. private FindOperation<TProjection> CreateFindOperation<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options)
  785. {
  786. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  787. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  788. return new FindOperation<TProjection>(
  789. _collectionNamespace,
  790. renderedProjection.ProjectionSerializer,
  791. _messageEncoderSettings)
  792. {
  793. AllowPartialResults = options.AllowPartialResults,
  794. BatchSize = options.BatchSize,
  795. Collation = options.Collation,
  796. Comment = options.Comment,
  797. CursorType = options.CursorType.ToCore(),
  798. Filter = filter.Render(_documentSerializer, _settings.SerializerRegistry),
  799. Limit = options.Limit,
  800. MaxAwaitTime = options.MaxAwaitTime,
  801. MaxTime = options.MaxTime,
  802. Modifiers = options.Modifiers,
  803. NoCursorTimeout = options.NoCursorTimeout,
  804. OplogReplay = options.OplogReplay,
  805. Projection = renderedProjection.Document,
  806. ReadConcern = _settings.ReadConcern,
  807. Skip = options.Skip,
  808. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry)
  809. };
  810. }
  811. private MapReduceOperation<TResult> CreateMapReduceOperation<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options, IBsonSerializer<TResult> resultSerializer)
  812. {
  813. return new MapReduceOperation<TResult>(
  814. _collectionNamespace,
  815. map,
  816. reduce,
  817. resultSerializer,
  818. _messageEncoderSettings)
  819. {
  820. Collation = options.Collation,
  821. Filter = options.Filter == null ? null : options.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  822. FinalizeFunction = options.Finalize,
  823. JavaScriptMode = options.JavaScriptMode,
  824. Limit = options.Limit,
  825. MaxTime = options.MaxTime,
  826. ReadConcern = _settings.ReadConcern,
  827. Scope = options.Scope,
  828. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  829. Verbose = options.Verbose
  830. };
  831. }
  832. private MapReduceOutputToCollectionOperation CreateMapReduceOutputToCollectionOperation<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options, MapReduceOutputOptions outputOptions)
  833. {
  834. var collectionOutputOptions = (MapReduceOutputOptions.CollectionOutput)outputOptions;
  835. var databaseNamespace = collectionOutputOptions.DatabaseName == null ?
  836. _collectionNamespace.DatabaseNamespace :
  837. new DatabaseNamespace(collectionOutputOptions.DatabaseName);
  838. var outputCollectionNamespace = new CollectionNamespace(databaseNamespace, collectionOutputOptions.CollectionName);
  839. return new MapReduceOutputToCollectionOperation(
  840. _collectionNamespace,
  841. outputCollectionNamespace,
  842. map,
  843. reduce,
  844. _messageEncoderSettings)
  845. {
  846. BypassDocumentValidation = options.BypassDocumentValidation,
  847. Collation = options.Collation,
  848. Filter = options.Filter == null ? null : options.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  849. FinalizeFunction = options.Finalize,
  850. JavaScriptMode = options.JavaScriptMode,
  851. Limit = options.Limit,
  852. MaxTime = options.MaxTime,
  853. NonAtomicOutput = collectionOutputOptions.NonAtomic,
  854. Scope = options.Scope,
  855. OutputMode = collectionOutputOptions.OutputMode,
  856. ShardedOutput = collectionOutputOptions.Sharded,
  857. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  858. Verbose = options.Verbose,
  859. WriteConcern = _settings.WriteConcern
  860. };
  861. }
  862. private FindOperation<TResult> CreateMapReduceOutputToCollectionFindOperation<TResult>(MapReduceOptions<TDocument, TResult> options, CollectionNamespace outputCollectionNamespace, IBsonSerializer<TResult> resultSerializer)
  863. {
  864. return new FindOperation<TResult>(
  865. outputCollectionNamespace,
  866. resultSerializer,
  867. _messageEncoderSettings)
  868. {
  869. Collation = options.Collation,
  870. MaxTime = options.MaxTime,
  871. ReadConcern = _settings.ReadConcern
  872. };
  873. }
  874. private IReadBindingHandle CreateReadBinding(IClientSessionHandle session, ReadPreference readPreference)
  875. {
  876. if (session.IsInTransaction && readPreference.ReadPreferenceMode != ReadPreferenceMode.Primary)
  877. {
  878. throw new InvalidOperationException("Read preference in a transaction must be primary.");
  879. }
  880. var binding = new ReadPreferenceBinding(_cluster, readPreference, session.WrappedCoreSession.Fork());
  881. return new ReadBindingHandle(binding);
  882. }
  883. private IWriteBindingHandle CreateReadWriteBinding(IClientSessionHandle session)
  884. {
  885. var binding = new WritableServerBinding(_cluster, session.WrappedCoreSession.Fork());
  886. return new ReadWriteBindingHandle(binding);
  887. }
  888. private IBsonSerializer<TField> GetValueSerializerForDistinct<TField>(RenderedFieldDefinition<TField> renderedField, IBsonSerializerRegistry serializerRegistry)
  889. {
  890. if (renderedField.UnderlyingSerializer != null)
  891. {
  892. if (renderedField.UnderlyingSerializer.ValueType == typeof(TField))
  893. {
  894. return (IBsonSerializer<TField>)renderedField.UnderlyingSerializer;
  895. }
  896. var arraySerializer = renderedField.UnderlyingSerializer as IBsonArraySerializer;
  897. if (arraySerializer != null)
  898. {
  899. BsonSerializationInfo itemSerializationInfo;
  900. if (arraySerializer.TryGetItemSerializationInfo(out itemSerializationInfo))
  901. {
  902. if (itemSerializationInfo.Serializer.ValueType == typeof(TField))
  903. {
  904. return (IBsonSerializer<TField>)itemSerializationInfo.Serializer;
  905. }
  906. }
  907. }
  908. }
  909. return serializerRegistry.GetSerializer<TField>();
  910. }
  911. private TResult ExecuteReadOperation<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  912. {
  913. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, _settings.ReadPreference);
  914. return ExecuteReadOperation(session, operation, effectiveReadPreference, cancellationToken);
  915. }
  916. private TResult ExecuteReadOperation<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, ReadPreference readPreference, CancellationToken cancellationToken = default(CancellationToken))
  917. {
  918. using (var binding = CreateReadBinding(session, readPreference))
  919. {
  920. return _operationExecutor.ExecuteReadOperation(binding, operation, cancellationToken);
  921. }
  922. }
  923. private Task<TResult> ExecuteReadOperationAsync<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  924. {
  925. var effectiveReadPreference = ReadPreferenceResolver.GetEffectiveReadPreference(session, null, _settings.ReadPreference);
  926. return ExecuteReadOperationAsync(session, operation, effectiveReadPreference, cancellationToken);
  927. }
  928. private async Task<TResult> ExecuteReadOperationAsync<TResult>(IClientSessionHandle session, IReadOperation<TResult> operation, ReadPreference readPreference, CancellationToken cancellationToken = default(CancellationToken))
  929. {
  930. using (var binding = CreateReadBinding(session, readPreference))
  931. {
  932. return await _operationExecutor.ExecuteReadOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  933. }
  934. }
  935. private TResult ExecuteWriteOperation<TResult>(IClientSessionHandle session, IWriteOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  936. {
  937. using (var binding = CreateReadWriteBinding(session))
  938. {
  939. return _operationExecutor.ExecuteWriteOperation(binding, operation, cancellationToken);
  940. }
  941. }
  942. private async Task<TResult> ExecuteWriteOperationAsync<TResult>(IClientSessionHandle session, IWriteOperation<TResult> operation, CancellationToken cancellationToken = default(CancellationToken))
  943. {
  944. using (var binding = CreateReadWriteBinding(session))
  945. {
  946. return await _operationExecutor.ExecuteWriteOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  947. }
  948. }
  949. private IEnumerable<BsonDocument> RenderArrayFilters(IEnumerable<ArrayFilterDefinition> arrayFilters)
  950. {
  951. if (arrayFilters == null)
  952. {
  953. return null;
  954. }
  955. var renderedArrayFilters = new List<BsonDocument>();
  956. foreach (var arrayFilter in arrayFilters)
  957. {
  958. var renderedArrayFilter = arrayFilter.Render(null, _settings.SerializerRegistry);
  959. renderedArrayFilters.Add(renderedArrayFilter);
  960. }
  961. return renderedArrayFilters;
  962. }
  963. private IBsonSerializer<TResult> ResolveResultSerializer<TResult>(IBsonSerializer<TResult> resultSerializer)
  964. {
  965. if (resultSerializer != null)
  966. {
  967. return resultSerializer;
  968. }
  969. if (typeof(TResult) == typeof(TDocument) && _documentSerializer != null)
  970. {
  971. return (IBsonSerializer<TResult>)_documentSerializer;
  972. }
  973. return _settings.SerializerRegistry.GetSerializer<TResult>();
  974. }
  975. private void UsingImplicitSession(Action<IClientSessionHandle> func, CancellationToken cancellationToken = default(CancellationToken))
  976. {
  977. using (var session = _operationExecutor.StartImplicitSession(cancellationToken))
  978. {
  979. func(session);
  980. }
  981. }
  982. private TResult UsingImplicitSession<TResult>(Func<IClientSessionHandle, TResult> func, CancellationToken cancellationToken = default(CancellationToken))
  983. {
  984. using (var session = _operationExecutor.StartImplicitSession(cancellationToken))
  985. {
  986. return func(session);
  987. }
  988. }
  989. private async Task UsingImplicitSessionAsync(Func<IClientSessionHandle, Task> funcAsync, CancellationToken cancellationToken = default(CancellationToken))
  990. {
  991. using (var session = await _operationExecutor.StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
  992. {
  993. await funcAsync(session).ConfigureAwait(false);
  994. }
  995. }
  996. private async Task<TResult> UsingImplicitSessionAsync<TResult>(Func<IClientSessionHandle, Task<TResult>> funcAsync, CancellationToken cancellationToken = default(CancellationToken))
  997. {
  998. using (var session = await _operationExecutor.StartImplicitSessionAsync(cancellationToken).ConfigureAwait(false))
  999. {
  1000. return await funcAsync(session).ConfigureAwait(false);
  1001. }
  1002. }
  1003. // nested types
  1004. private class MongoIndexManager : MongoIndexManagerBase<TDocument>
  1005. {
  1006. // private fields
  1007. private readonly MongoCollectionImpl<TDocument> _collection;
  1008. // constructors
  1009. public MongoIndexManager(MongoCollectionImpl<TDocument> collection)
  1010. {
  1011. _collection = collection;
  1012. }
  1013. // public properties
  1014. public override CollectionNamespace CollectionNamespace
  1015. {
  1016. get { return _collection.CollectionNamespace; }
  1017. }
  1018. public override IBsonSerializer<TDocument> DocumentSerializer
  1019. {
  1020. get { return _collection.DocumentSerializer; }
  1021. }
  1022. public override MongoCollectionSettings Settings
  1023. {
  1024. get { return _collection._settings; }
  1025. }
  1026. // public methods
  1027. public override IEnumerable<string> CreateMany(IEnumerable<CreateIndexModel<TDocument>> models, CancellationToken cancellationToken = default(CancellationToken))
  1028. {
  1029. return CreateMany(models, null, cancellationToken);
  1030. }
  1031. public override IEnumerable<string> CreateMany(
  1032. IEnumerable<CreateIndexModel<TDocument>> models,
  1033. CreateManyIndexesOptions options,
  1034. CancellationToken cancellationToken = default(CancellationToken))
  1035. {
  1036. return _collection.UsingImplicitSession(session => CreateMany(session, models, options, cancellationToken), cancellationToken);
  1037. }
  1038. public override IEnumerable<string> CreateMany(IClientSessionHandle session, IEnumerable<CreateIndexModel<TDocument>> models, CancellationToken cancellationToken = default(CancellationToken))
  1039. {
  1040. return CreateMany(session, models, null, cancellationToken);
  1041. }
  1042. public override IEnumerable<string> CreateMany(
  1043. IClientSessionHandle session,
  1044. IEnumerable<CreateIndexModel<TDocument>> models,
  1045. CreateManyIndexesOptions options,
  1046. CancellationToken cancellationToken = default(CancellationToken))
  1047. {
  1048. Ensure.IsNotNull(session, nameof(session));
  1049. Ensure.IsNotNull(models, nameof(models));
  1050. var requests = CreateCreateIndexRequests(models);
  1051. var operation = CreateCreateIndexesOperation(requests, options);
  1052. _collection.ExecuteWriteOperation(session, operation, cancellationToken);
  1053. return requests.Select(x => x.GetIndexName());
  1054. }
  1055. public override Task<IEnumerable<string>> CreateManyAsync(IEnumerable<CreateIndexModel<TDocument>> models, CancellationToken cancellationToken = default(CancellationToken))
  1056. {
  1057. return CreateManyAsync(models, null, cancellationToken);
  1058. }
  1059. public override Task<IEnumerable<string>> CreateManyAsync(
  1060. IEnumerable<CreateIndexModel<TDocument>> models,
  1061. CreateManyIndexesOptions options,
  1062. CancellationToken cancellationToken = default(CancellationToken))
  1063. {
  1064. return _collection.UsingImplicitSessionAsync(session => CreateManyAsync(session, models, options, cancellationToken), cancellationToken);
  1065. }
  1066. public override Task<IEnumerable<string>> CreateManyAsync(IClientSessionHandle session, IEnumerable<CreateIndexModel<TDocument>> models, CancellationToken cancellationToken = default(CancellationToken))
  1067. {
  1068. return CreateManyAsync(session, models, null, cancellationToken);
  1069. }
  1070. public override async Task<IEnumerable<string>> CreateManyAsync(
  1071. IClientSessionHandle session,
  1072. IEnumerable<CreateIndexModel<TDocument>> models,
  1073. CreateManyIndexesOptions options,
  1074. CancellationToken cancellationToken = default(CancellationToken))
  1075. {
  1076. Ensure.IsNotNull(session, nameof(session));
  1077. Ensure.IsNotNull(models, nameof(models));
  1078. var requests = CreateCreateIndexRequests(models);
  1079. var operation = CreateCreateIndexesOperation(requests, options);
  1080. await _collection.ExecuteWriteOperationAsync(session, operation, cancellationToken).ConfigureAwait(false);
  1081. return requests.Select(x => x.GetIndexName());
  1082. }
  1083. public override void DropAll(CancellationToken cancellationToken)
  1084. {
  1085. _collection.UsingImplicitSession(session => DropAll(session, cancellationToken), cancellationToken);
  1086. }
  1087. public override void DropAll(DropIndexOptions options, CancellationToken cancellationToken = default(CancellationToken))
  1088. {
  1089. _collection.UsingImplicitSession(session => DropAll(session, options, cancellationToken), cancellationToken);
  1090. }
  1091. public override void DropAll(IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken))
  1092. {
  1093. DropAll(session, null, cancellationToken);
  1094. }
  1095. public override void DropAll(IClientSessionHandle session, DropIndexOptions options, CancellationToken cancellationToken = default(CancellationToken))
  1096. {
  1097. Ensure.IsNotNull(session, nameof(session));
  1098. var operation = CreateDropAllOperation(options);
  1099. _collection.ExecuteWriteOperation(session, operation, cancellationToken);
  1100. }
  1101. public override Task DropAllAsync(CancellationToken cancellationToken)
  1102. {
  1103. return _collection.UsingImplicitSessionAsync(session => DropAllAsync(session, cancellationToken), cancellationToken);
  1104. }
  1105. public override Task DropAllAsync(DropIndexOptions options, CancellationToken cancellationToken = default(CancellationToken))
  1106. {
  1107. return _collection.UsingImplicitSessionAsync(session => DropAllAsync(session, options, cancellationToken), cancellationToken);
  1108. }
  1109. public override Task DropAllAsync(IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken))
  1110. {
  1111. return DropAllAsync(session, null, cancellationToken);
  1112. }
  1113. public override Task DropAllAsync(IClientSessionHandle session, DropIndexOptions options, CancellationToken cancellationToken = default(CancellationToken))
  1114. {
  1115. Ensure.IsNotNull(session, nameof(session));
  1116. var operation = CreateDropAllOperation(options);
  1117. return _collection.ExecuteWriteOperationAsync(session, operation, cancellationToken);
  1118. }
  1119. public override void DropOne(string name, CancellationToken cancellationToken = default(CancellationToken))
  1120. {
  1121. _collection.UsingImplicitSession(session => DropOne(session, name, cancellationToken), cancellationToken);
  1122. }
  1123. public override void DropOne(string name, DropIndexOptions options, CancellationToken cancellationToken = default(CancellationToken))
  1124. {
  1125. _collection.UsingImplicitSession(session => DropOne(session, name, options, cancellationToken), cancellationToken);
  1126. }
  1127. public override void DropOne(IClientSessionHandle session, string name, CancellationToken cancellationToken = default(CancellationToken))
  1128. {
  1129. DropOne(session, name, null, cancellationToken);
  1130. }
  1131. public override void DropOne(
  1132. IClientSessionHandle session,
  1133. string name,
  1134. DropIndexOptions options,
  1135. CancellationToken cancellationToken)
  1136. {
  1137. Ensure.IsNotNull(session, nameof(session));
  1138. Ensure.IsNotNullOrEmpty(name, nameof(name));
  1139. if (name == "*")
  1140. {
  1141. throw new ArgumentException("Cannot specify '*' for the index name. Use DropAllAsync to drop all indexes.", "name");
  1142. }
  1143. var operation = CreateDropOneOperation(name, options);
  1144. _collection.ExecuteWriteOperation(session, operation, cancellationToken);
  1145. }
  1146. public override Task DropOneAsync(string name, CancellationToken cancellationToken = default(CancellationToken))
  1147. {
  1148. return _collection.UsingImplicitSessionAsync(session => DropOneAsync(session, name, cancellationToken), cancellationToken);
  1149. }
  1150. public override Task DropOneAsync(string name, DropIndexOptions options, CancellationToken cancellationToken = default(CancellationToken))
  1151. {
  1152. return _collection.UsingImplicitSessionAsync(session => DropOneAsync(session, name,options, cancellationToken), cancellationToken);
  1153. }
  1154. public override Task DropOneAsync(IClientSessionHandle session, string name, CancellationToken cancellationToken = default(CancellationToken))
  1155. {
  1156. return DropOneAsync(session, name, null, cancellationToken);
  1157. }
  1158. public override Task DropOneAsync(
  1159. IClientSessionHandle session,
  1160. string name,
  1161. DropIndexOptions options,
  1162. CancellationToken cancellationToken)
  1163. {
  1164. Ensure.IsNotNull(session, nameof(session));
  1165. Ensure.IsNotNullOrEmpty(name, nameof(name));
  1166. if (name == "*")
  1167. {
  1168. throw new ArgumentException("Cannot specify '*' for the index name. Use DropAllAsync to drop all indexes.", "name");
  1169. }
  1170. var operation = CreateDropOneOperation(name, options);
  1171. return _collection.ExecuteWriteOperationAsync(session, operation, cancellationToken);
  1172. }
  1173. public override IAsyncCursor<BsonDocument> List(CancellationToken cancellationToken = default(CancellationToken))
  1174. {
  1175. return _collection.UsingImplicitSession(session => List(session, cancellationToken), cancellationToken);
  1176. }
  1177. public override IAsyncCursor<BsonDocument> List(IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken))
  1178. {
  1179. Ensure.IsNotNull(session, nameof(session));
  1180. var operation = CreateListIndexesOperation();
  1181. return _collection.ExecuteReadOperation(session, operation, ReadPreference.Primary, cancellationToken);
  1182. }
  1183. public override Task<IAsyncCursor<BsonDocument>> ListAsync(CancellationToken cancellationToken = default(CancellationToken))
  1184. {
  1185. return _collection.UsingImplicitSessionAsync(session => ListAsync(session, cancellationToken), cancellationToken);
  1186. }
  1187. public override Task<IAsyncCursor<BsonDocument>> ListAsync(IClientSessionHandle session, CancellationToken cancellationToken = default(CancellationToken))
  1188. {
  1189. Ensure.IsNotNull(session, nameof(session));
  1190. var operation = CreateListIndexesOperation();
  1191. return _collection.ExecuteReadOperationAsync(session, operation, ReadPreference.Primary, cancellationToken);
  1192. }
  1193. // private methods
  1194. private CreateIndexesOperation CreateCreateIndexesOperation(IEnumerable<CreateIndexRequest> requests, CreateManyIndexesOptions options)
  1195. {
  1196. return new CreateIndexesOperation(_collection._collectionNamespace, requests, _collection._messageEncoderSettings)
  1197. {
  1198. MaxTime = options?.MaxTime,
  1199. WriteConcern = _collection.Settings.WriteConcern
  1200. };
  1201. }
  1202. private IEnumerable<CreateIndexRequest> CreateCreateIndexRequests(IEnumerable<CreateIndexModel<TDocument>> models)
  1203. {
  1204. return models.Select(m =>
  1205. {
  1206. var options = m.Options ?? new CreateIndexOptions<TDocument>();
  1207. var keysDocument = m.Keys.Render(_collection._documentSerializer, _collection._settings.SerializerRegistry);
  1208. var renderedPartialFilterExpression = options.PartialFilterExpression == null ? null : options.PartialFilterExpression.Render(_collection._documentSerializer, _collection._settings.SerializerRegistry);
  1209. return new CreateIndexRequest(keysDocument)
  1210. {
  1211. Name = options.Name,
  1212. Background = options.Background,
  1213. Bits = options.Bits,
  1214. BucketSize = options.BucketSize,
  1215. Collation = options.Collation,
  1216. DefaultLanguage = options.DefaultLanguage,
  1217. ExpireAfter = options.ExpireAfter,
  1218. LanguageOverride = options.LanguageOverride,
  1219. Max = options.Max,
  1220. Min = options.Min,
  1221. PartialFilterExpression = renderedPartialFilterExpression,
  1222. Sparse = options.Sparse,
  1223. SphereIndexVersion = options.SphereIndexVersion,
  1224. StorageEngine = options.StorageEngine,
  1225. TextIndexVersion = options.TextIndexVersion,
  1226. Unique = options.Unique,
  1227. Version = options.Version,
  1228. Weights = options.Weights
  1229. };
  1230. });
  1231. }
  1232. private DropIndexOperation CreateDropAllOperation(DropIndexOptions options)
  1233. {
  1234. return new DropIndexOperation(_collection._collectionNamespace, "*", _collection._messageEncoderSettings)
  1235. {
  1236. MaxTime = options?.MaxTime,
  1237. WriteConcern = _collection.Settings.WriteConcern
  1238. };
  1239. }
  1240. private DropIndexOperation CreateDropOneOperation(string name, DropIndexOptions options)
  1241. {
  1242. return new DropIndexOperation(_collection._collectionNamespace, name, _collection._messageEncoderSettings)
  1243. {
  1244. MaxTime = options?.MaxTime,
  1245. WriteConcern = _collection.Settings.WriteConcern
  1246. };
  1247. }
  1248. private ListIndexesOperation CreateListIndexesOperation()
  1249. {
  1250. return new ListIndexesOperation(_collection._collectionNamespace, _collection._messageEncoderSettings);
  1251. }
  1252. }
  1253. }
  1254. }