MongoCollectionImpl.cs 50 KB


  1. /* Copyright 2010-2017 MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. using MongoDB.Bson;
  21. using MongoDB.Bson.IO;
  22. using MongoDB.Bson.Serialization;
  23. using MongoDB.Driver.Core.Bindings;
  24. using MongoDB.Driver.Core.Clusters;
  25. using MongoDB.Driver.Core.Misc;
  26. using MongoDB.Driver.Core.Operations;
  27. using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
  28. using MongoDB.Driver.Linq;
  29. namespace MongoDB.Driver
  30. {
  31. internal sealed class MongoCollectionImpl<TDocument> : MongoCollectionBase<TDocument>
  32. {
  33. // fields
  34. private readonly ICluster _cluster;
  35. private readonly CollectionNamespace _collectionNamespace;
  36. private readonly IMongoDatabase _database;
  37. private readonly MessageEncoderSettings _messageEncoderSettings;
  38. private readonly IOperationExecutor _operationExecutor;
  39. private readonly IBsonSerializer<TDocument> _documentSerializer;
  40. private readonly MongoCollectionSettings _settings;
  41. // constructors
  42. public MongoCollectionImpl(IMongoDatabase database, CollectionNamespace collectionNamespace, MongoCollectionSettings settings, ICluster cluster, IOperationExecutor operationExecutor)
  43. : this(database, collectionNamespace, settings, cluster, operationExecutor, Ensure.IsNotNull(settings, "settings").SerializerRegistry.GetSerializer<TDocument>())
  44. {
  45. }
  46. private MongoCollectionImpl(IMongoDatabase database, CollectionNamespace collectionNamespace, MongoCollectionSettings settings, ICluster cluster, IOperationExecutor operationExecutor, IBsonSerializer<TDocument> documentSerializer)
  47. {
  48. _database = Ensure.IsNotNull(database, nameof(database));
  49. _collectionNamespace = Ensure.IsNotNull(collectionNamespace, nameof(collectionNamespace));
  50. _settings = Ensure.IsNotNull(settings, nameof(settings)).Freeze();
  51. _cluster = Ensure.IsNotNull(cluster, nameof(cluster));
  52. _operationExecutor = Ensure.IsNotNull(operationExecutor, nameof(operationExecutor));
  53. _documentSerializer = Ensure.IsNotNull(documentSerializer, nameof(documentSerializer));
  54. _messageEncoderSettings = new MessageEncoderSettings
  55. {
  56. { MessageEncoderSettingsName.GuidRepresentation, _settings.GuidRepresentation },
  57. { MessageEncoderSettingsName.ReadEncoding, _settings.ReadEncoding ?? Utf8Encodings.Strict },
  58. { MessageEncoderSettingsName.WriteEncoding, _settings.WriteEncoding ?? Utf8Encodings.Strict }
  59. };
  60. }
  61. // properties
  62. public override CollectionNamespace CollectionNamespace
  63. {
  64. get { return _collectionNamespace; }
  65. }
  66. public override IMongoDatabase Database
  67. {
  68. get { return _database; }
  69. }
  70. public override IBsonSerializer<TDocument> DocumentSerializer
  71. {
  72. get { return _documentSerializer; }
  73. }
  74. public override IMongoIndexManager<TDocument> Indexes
  75. {
  76. get { return new MongoIndexManager(this); }
  77. }
  78. public override MongoCollectionSettings Settings
  79. {
  80. get { return _settings; }
  81. }
  82. // public methods
  83. public override IAsyncCursor<TResult> Aggregate<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken)
  84. {
  85. var renderedPipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).Render(_documentSerializer, _settings.SerializerRegistry);
  86. options = options ?? new AggregateOptions();
  87. var last = renderedPipeline.Documents.LastOrDefault();
  88. if (last != null && last.GetElement(0).Name == "$out")
  89. {
  90. var aggregateOperation = CreateAggregateToCollectionOperation(renderedPipeline, options);
  91. ExecuteWriteOperation(aggregateOperation, cancellationToken);
  92. // we want to delay execution of the find because the user may
  93. // not want to iterate the results at all...
  94. var findOperation = CreateAggregateToCollectionFindOperation(last, renderedPipeline.OutputSerializer, options);
  95. var deferredCursor = new DeferredAsyncCursor<TResult>(
  96. ct => ExecuteReadOperation(findOperation, ReadPreference.Primary, ct),
  97. ct => ExecuteReadOperationAsync(findOperation, ReadPreference.Primary, ct));
  98. return deferredCursor;
  99. }
  100. else
  101. {
  102. var aggregateOperation = CreateAggregateOperation(renderedPipeline, options);
  103. return ExecuteReadOperation(aggregateOperation, cancellationToken);
  104. }
  105. }
  106. public override async Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken)
  107. {
  108. var renderedPipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)).Render(_documentSerializer, _settings.SerializerRegistry);
  109. options = options ?? new AggregateOptions();
  110. var last = renderedPipeline.Documents.LastOrDefault();
  111. if (last != null && last.GetElement(0).Name == "$out")
  112. {
  113. var aggregateOperation = CreateAggregateToCollectionOperation(renderedPipeline, options);
  114. await ExecuteWriteOperationAsync(aggregateOperation, cancellationToken).ConfigureAwait(false);
  115. // we want to delay execution of the find because the user may
  116. // not want to iterate the results at all...
  117. var findOperation = CreateAggregateToCollectionFindOperation(last, renderedPipeline.OutputSerializer, options);
  118. var deferredCursor = new DeferredAsyncCursor<TResult>(
  119. ct => ExecuteReadOperation(findOperation, ReadPreference.Primary, ct),
  120. ct => ExecuteReadOperationAsync(findOperation, ReadPreference.Primary, ct));
  121. return await Task.FromResult<IAsyncCursor<TResult>>(deferredCursor).ConfigureAwait(false);
  122. }
  123. else
  124. {
  125. var aggregateOperation = CreateAggregateOperation(renderedPipeline, options);
  126. return await ExecuteReadOperationAsync(aggregateOperation, cancellationToken).ConfigureAwait(false);
  127. }
  128. }
  129. public override BulkWriteResult<TDocument> BulkWrite(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken)
  130. {
  131. Ensure.IsNotNull(requests, nameof(requests));
  132. if (!requests.Any())
  133. {
  134. throw new ArgumentException("Must contain at least 1 request.", "requests");
  135. }
  136. options = options ?? new BulkWriteOptions();
  137. var operation = CreateBulkWriteOperation(requests, options);
  138. try
  139. {
  140. var result = ExecuteWriteOperation(operation, cancellationToken);
  141. return BulkWriteResult<TDocument>.FromCore(result, requests);
  142. }
  143. catch (MongoBulkWriteOperationException ex)
  144. {
  145. throw MongoBulkWriteException<TDocument>.FromCore(ex, requests.ToList());
  146. }
  147. }
  148. public override async Task<BulkWriteResult<TDocument>> BulkWriteAsync(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken)
  149. {
  150. Ensure.IsNotNull(requests, nameof(requests));
  151. if (!requests.Any())
  152. {
  153. throw new ArgumentException("Must contain at least 1 request.", "requests");
  154. }
  155. options = options ?? new BulkWriteOptions();
  156. var operation = CreateBulkWriteOperation(requests, options);
  157. try
  158. {
  159. var result = await ExecuteWriteOperationAsync(operation, cancellationToken).ConfigureAwait(false);
  160. return BulkWriteResult<TDocument>.FromCore(result, requests);
  161. }
  162. catch (MongoBulkWriteOperationException ex)
  163. {
  164. throw MongoBulkWriteException<TDocument>.FromCore(ex, requests.ToList());
  165. }
  166. }
  167. public override long Count(FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken)
  168. {
  169. Ensure.IsNotNull(filter, nameof(filter));
  170. options = options ?? new CountOptions();
  171. var operation = CreateCountOperation(filter, options);
  172. return ExecuteReadOperation(operation, cancellationToken);
  173. }
  174. public override Task<long> CountAsync(FilterDefinition<TDocument> filter, CountOptions options, CancellationToken cancellationToken)
  175. {
  176. Ensure.IsNotNull(filter, nameof(filter));
  177. options = options ?? new CountOptions();
  178. var operation = CreateCountOperation(filter, options);
  179. return ExecuteReadOperationAsync(operation, cancellationToken);
  180. }
  181. public override IAsyncCursor<TField> Distinct<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options, CancellationToken cancellationToken)
  182. {
  183. Ensure.IsNotNull(field, nameof(field));
  184. Ensure.IsNotNull(filter, nameof(filter));
  185. options = options ?? new DistinctOptions();
  186. var operation = CreateDistinctOperation(field, filter, options);
  187. return ExecuteReadOperation(operation, cancellationToken);
  188. }
  189. public override Task<IAsyncCursor<TField>> DistinctAsync<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options, CancellationToken cancellationToken)
  190. {
  191. Ensure.IsNotNull(field, nameof(field));
  192. Ensure.IsNotNull(filter, nameof(filter));
  193. options = options ?? new DistinctOptions();
  194. var operation = CreateDistinctOperation(field, filter, options);
  195. return ExecuteReadOperationAsync(operation, cancellationToken);
  196. }
  197. public override IAsyncCursor<TProjection> FindSync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  198. {
  199. Ensure.IsNotNull(filter, nameof(filter));
  200. options = options ?? new FindOptions<TDocument, TProjection>();
  201. var operation = CreateFindOperation<TProjection>(filter, options);
  202. return ExecuteReadOperation(operation, cancellationToken);
  203. }
  204. public override Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  205. {
  206. Ensure.IsNotNull(filter, nameof(filter));
  207. options = options ?? new FindOptions<TDocument, TProjection>();
  208. var operation = CreateFindOperation<TProjection>(filter, options);
  209. return ExecuteReadOperationAsync(operation, cancellationToken);
  210. }
  211. public override TProjection FindOneAndDelete<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  212. {
  213. Ensure.IsNotNull(filter, nameof(filter));
  214. options = options ?? new FindOneAndDeleteOptions<TDocument, TProjection>();
  215. var operation = CreateFindOneAndDeleteOperation<TProjection>(filter, options);
  216. return ExecuteWriteOperation(operation, cancellationToken);
  217. }
  218. public override Task<TProjection> FindOneAndDeleteAsync<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  219. {
  220. Ensure.IsNotNull(filter, nameof(filter));
  221. options = options ?? new FindOneAndDeleteOptions<TDocument, TProjection>();
  222. var operation = CreateFindOneAndDeleteOperation<TProjection>(filter, options);
  223. return ExecuteWriteOperationAsync(operation, cancellationToken);
  224. }
  225. public override TProjection FindOneAndReplace<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  226. {
  227. Ensure.IsNotNull(filter, nameof(filter));
  228. var replacementObject = Ensure.IsNotNull((object)replacement, nameof(replacement)); // only box once if it's a struct
  229. options = options ?? new FindOneAndReplaceOptions<TDocument, TProjection>();
  230. var operation = CreateFindOneAndReplaceOperation(filter, replacementObject, options);
  231. return ExecuteWriteOperation(operation, cancellationToken);
  232. }
  233. public override Task<TProjection> FindOneAndReplaceAsync<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  234. {
  235. Ensure.IsNotNull(filter, nameof(filter));
  236. var replacementObject = Ensure.IsNotNull((object)replacement, nameof(replacement)); // only box once if it's a struct
  237. options = options ?? new FindOneAndReplaceOptions<TDocument, TProjection>();
  238. var operation = CreateFindOneAndReplaceOperation(filter, replacementObject, options);
  239. return ExecuteWriteOperationAsync(operation, cancellationToken);
  240. }
  241. public override TProjection FindOneAndUpdate<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  242. {
  243. Ensure.IsNotNull(filter, nameof(filter));
  244. Ensure.IsNotNull(update, nameof(update));
  245. options = options ?? new FindOneAndUpdateOptions<TDocument, TProjection>();
  246. var operation = CreateFindOneAndUpdateOperation(filter, update, options);
  247. return ExecuteWriteOperation(operation, cancellationToken);
  248. }
  249. public override Task<TProjection> FindOneAndUpdateAsync<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options, CancellationToken cancellationToken)
  250. {
  251. Ensure.IsNotNull(filter, nameof(filter));
  252. Ensure.IsNotNull(update, nameof(update));
  253. options = options ?? new FindOneAndUpdateOptions<TDocument, TProjection>();
  254. var operation = CreateFindOneAndUpdateOperation(filter, update, options);
  255. return ExecuteWriteOperationAsync(operation, cancellationToken);
  256. }
  257. public override IAsyncCursor<TResult> MapReduce<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  258. {
  259. Ensure.IsNotNull(map, nameof(map));
  260. Ensure.IsNotNull(reduce, nameof(reduce));
  261. options = options ?? new MapReduceOptions<TDocument, TResult>();
  262. var outputOptions = options.OutputOptions ?? MapReduceOutputOptions.Inline;
  263. var resultSerializer = ResolveResultSerializer<TResult>(options.ResultSerializer);
  264. if (outputOptions == MapReduceOutputOptions.Inline)
  265. {
  266. var operation = CreateMapReduceOperation(map, reduce, options, resultSerializer);
  267. return ExecuteReadOperation(operation, cancellationToken);
  268. }
  269. else
  270. {
  271. var mapReduceOperation = CreateMapReduceOutputToCollectionOperation(map, reduce, options, outputOptions);
  272. ExecuteWriteOperation(mapReduceOperation, cancellationToken);
  273. var findOperation = CreateMapReduceOutputToCollectionFindOperation<TResult>(options, mapReduceOperation.OutputCollectionNamespace, resultSerializer);
  274. // we want to delay execution of the find because the user may
  275. // not want to iterate the results at all...
  276. var deferredCursor = new DeferredAsyncCursor<TResult>(
  277. ct => ExecuteReadOperation(findOperation, ReadPreference.Primary, ct),
  278. ct => ExecuteReadOperationAsync(findOperation, ReadPreference.Primary, ct));
  279. return deferredCursor;
  280. }
  281. }
  282. public override async Task<IAsyncCursor<TResult>> MapReduceAsync<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  283. {
  284. Ensure.IsNotNull(map, nameof(map));
  285. Ensure.IsNotNull(reduce, nameof(reduce));
  286. options = options ?? new MapReduceOptions<TDocument, TResult>();
  287. var outputOptions = options.OutputOptions ?? MapReduceOutputOptions.Inline;
  288. var resultSerializer = ResolveResultSerializer<TResult>(options.ResultSerializer);
  289. if (outputOptions == MapReduceOutputOptions.Inline)
  290. {
  291. var operation = CreateMapReduceOperation(map, reduce, options, resultSerializer);
  292. return await ExecuteReadOperationAsync(operation, cancellationToken).ConfigureAwait(false);
  293. }
  294. else
  295. {
  296. var mapReduceOperation = CreateMapReduceOutputToCollectionOperation(map, reduce, options, outputOptions);
  297. await ExecuteWriteOperationAsync(mapReduceOperation, cancellationToken).ConfigureAwait(false);
  298. var findOperation = CreateMapReduceOutputToCollectionFindOperation<TResult>(options, mapReduceOperation.OutputCollectionNamespace, resultSerializer);
  299. // we want to delay execution of the find because the user may
  300. // not want to iterate the results at all...
  301. var deferredCursor = new DeferredAsyncCursor<TResult>(
  302. ct => ExecuteReadOperation(findOperation, ReadPreference.Primary, ct),
  303. ct => ExecuteReadOperationAsync(findOperation, ReadPreference.Primary, ct));
  304. return await Task.FromResult(deferredCursor).ConfigureAwait(false);
  305. }
  306. }
  307. public override IFilteredMongoCollection<TDerivedDocument> OfType<TDerivedDocument>()
  308. {
  309. var derivedDocumentSerializer = _settings.SerializerRegistry.GetSerializer<TDerivedDocument>();
  310. var ofTypeSerializer = new OfTypeSerializer<TDocument, TDerivedDocument>(derivedDocumentSerializer);
  311. var derivedDocumentCollection = new MongoCollectionImpl<TDerivedDocument>(_database, _collectionNamespace, _settings, _cluster, _operationExecutor, ofTypeSerializer);
  312. var rootOfTypeFilter = Builders<TDocument>.Filter.OfType<TDerivedDocument>();
  313. var renderedOfTypeFilter = rootOfTypeFilter.Render(_documentSerializer, _settings.SerializerRegistry);
  314. var ofTypeFilter = new BsonDocumentFilterDefinition<TDerivedDocument>(renderedOfTypeFilter);
  315. return new OfTypeMongoCollection<TDocument, TDerivedDocument>(this, derivedDocumentCollection, ofTypeFilter);
  316. }
  317. public override IMongoCollection<TDocument> WithReadConcern(ReadConcern readConcern)
  318. {
  319. var newSettings = _settings.Clone();
  320. newSettings.ReadConcern = readConcern;
  321. return new MongoCollectionImpl<TDocument>(_database, _collectionNamespace, newSettings, _cluster, _operationExecutor);
  322. }
  323. public override IMongoCollection<TDocument> WithReadPreference(ReadPreference readPreference)
  324. {
  325. var newSettings = _settings.Clone();
  326. newSettings.ReadPreference = readPreference;
  327. return new MongoCollectionImpl<TDocument>(_database, _collectionNamespace, newSettings, _cluster, _operationExecutor);
  328. }
  329. public override IMongoCollection<TDocument> WithWriteConcern(WriteConcern writeConcern)
  330. {
  331. var newSettings = _settings.Clone();
  332. newSettings.WriteConcern = writeConcern;
  333. return new MongoCollectionImpl<TDocument>(_database, _collectionNamespace, newSettings, _cluster, _operationExecutor);
  334. }
  335. // private methods
  336. private void AssignId(TDocument document)
  337. {
  338. var idProvider = _documentSerializer as IBsonIdProvider;
  339. if (idProvider != null)
  340. {
  341. object id;
  342. Type idNominalType;
  343. IIdGenerator idGenerator;
  344. if (idProvider.GetDocumentId(document, out id, out idNominalType, out idGenerator))
  345. {
  346. if (idGenerator != null && idGenerator.IsEmpty(id))
  347. {
  348. id = idGenerator.GenerateId(this, document);
  349. idProvider.SetDocumentId(document, id);
  350. }
  351. }
  352. }
  353. }
  354. private WriteRequest ConvertWriteModelToWriteRequest(WriteModel<TDocument> model, int index)
  355. {
  356. switch (model.ModelType)
  357. {
  358. case WriteModelType.InsertOne:
  359. var insertOneModel = (InsertOneModel<TDocument>)model;
  360. if (_settings.AssignIdOnInsert)
  361. {
  362. AssignId(insertOneModel.Document);
  363. }
  364. return new InsertRequest(new BsonDocumentWrapper(insertOneModel.Document, _documentSerializer))
  365. {
  366. CorrelationId = index
  367. };
  368. case WriteModelType.DeleteMany:
  369. var deleteManyModel = (DeleteManyModel<TDocument>)model;
  370. return new DeleteRequest(deleteManyModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry))
  371. {
  372. CorrelationId = index,
  373. Collation = deleteManyModel.Collation,
  374. Limit = 0
  375. };
  376. case WriteModelType.DeleteOne:
  377. var deleteOneModel = (DeleteOneModel<TDocument>)model;
  378. return new DeleteRequest(deleteOneModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry))
  379. {
  380. CorrelationId = index,
  381. Collation = deleteOneModel.Collation,
  382. Limit = 1
  383. };
  384. case WriteModelType.ReplaceOne:
  385. var replaceOneModel = (ReplaceOneModel<TDocument>)model;
  386. return new UpdateRequest(
  387. UpdateType.Replacement,
  388. replaceOneModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  389. new BsonDocumentWrapper(replaceOneModel.Replacement, _documentSerializer))
  390. {
  391. Collation = replaceOneModel.Collation,
  392. CorrelationId = index,
  393. IsMulti = false,
  394. IsUpsert = replaceOneModel.IsUpsert
  395. };
  396. case WriteModelType.UpdateMany:
  397. var updateManyModel = (UpdateManyModel<TDocument>)model;
  398. return new UpdateRequest(
  399. UpdateType.Update,
  400. updateManyModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  401. updateManyModel.Update.Render(_documentSerializer, _settings.SerializerRegistry))
  402. {
  403. ArrayFilters = RenderArrayFilters(updateManyModel.ArrayFilters),
  404. Collation = updateManyModel.Collation,
  405. CorrelationId = index,
  406. IsMulti = true,
  407. IsUpsert = updateManyModel.IsUpsert
  408. };
  409. case WriteModelType.UpdateOne:
  410. var updateOneModel = (UpdateOneModel<TDocument>)model;
  411. return new UpdateRequest(
  412. UpdateType.Update,
  413. updateOneModel.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  414. updateOneModel.Update.Render(_documentSerializer, _settings.SerializerRegistry))
  415. {
  416. ArrayFilters = RenderArrayFilters(updateOneModel.ArrayFilters),
  417. Collation = updateOneModel.Collation,
  418. CorrelationId = index,
  419. IsMulti = false,
  420. IsUpsert = updateOneModel.IsUpsert
  421. };
  422. default:
  423. throw new InvalidOperationException("Unknown type of WriteModel provided.");
  424. }
  425. }
  426. private AggregateOperation<TResult> CreateAggregateOperation<TResult>(RenderedPipelineDefinition<TResult> renderedPipeline, AggregateOptions options)
  427. {
  428. return new AggregateOperation<TResult>(
  429. _collectionNamespace,
  430. renderedPipeline.Documents,
  431. renderedPipeline.OutputSerializer,
  432. _messageEncoderSettings)
  433. {
  434. AllowDiskUse = options.AllowDiskUse,
  435. BatchSize = options.BatchSize,
  436. Collation = options.Collation,
  437. MaxTime = options.MaxTime,
  438. ReadConcern = _settings.ReadConcern,
  439. UseCursor = options.UseCursor
  440. };
  441. }
  442. private FindOperation<TResult> CreateAggregateToCollectionFindOperation<TResult>(BsonDocument outStage, IBsonSerializer<TResult> resultSerializer, AggregateOptions options)
  443. {
  444. var outputCollectionName = outStage.GetElement(0).Value.AsString;
  445. return new FindOperation<TResult>(
  446. new CollectionNamespace(_collectionNamespace.DatabaseNamespace, outputCollectionName),
  447. resultSerializer,
  448. _messageEncoderSettings)
  449. {
  450. BatchSize = options.BatchSize,
  451. Collation = options.Collation,
  452. MaxTime = options.MaxTime,
  453. ReadConcern = _settings.ReadConcern
  454. };
  455. }
  456. private AggregateToCollectionOperation CreateAggregateToCollectionOperation<TResult>(RenderedPipelineDefinition<TResult> renderedPipeline, AggregateOptions options)
  457. {
  458. return new AggregateToCollectionOperation(
  459. _collectionNamespace,
  460. renderedPipeline.Documents,
  461. _messageEncoderSettings)
  462. {
  463. AllowDiskUse = options.AllowDiskUse,
  464. BypassDocumentValidation = options.BypassDocumentValidation,
  465. Collation = options.Collation,
  466. MaxTime = options.MaxTime,
  467. WriteConcern = _settings.WriteConcern
  468. };
  469. }
  470. private BulkMixedWriteOperation CreateBulkWriteOperation(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options)
  471. {
  472. return new BulkMixedWriteOperation(
  473. _collectionNamespace,
  474. requests.Select(ConvertWriteModelToWriteRequest),
  475. _messageEncoderSettings)
  476. {
  477. BypassDocumentValidation = options.BypassDocumentValidation,
  478. IsOrdered = options.IsOrdered,
  479. WriteConcern = _settings.WriteConcern
  480. };
  481. }
  482. private CountOperation CreateCountOperation(FilterDefinition<TDocument> filter, CountOptions options)
  483. {
  484. return new CountOperation(_collectionNamespace, _messageEncoderSettings)
  485. {
  486. Collation = options.Collation,
  487. Filter = filter.Render(_documentSerializer, _settings.SerializerRegistry),
  488. Hint = options.Hint,
  489. Limit = options.Limit,
  490. MaxTime = options.MaxTime,
  491. ReadConcern = _settings.ReadConcern,
  492. Skip = options.Skip
  493. };
  494. }
  495. private DistinctOperation<TField> CreateDistinctOperation<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options)
  496. {
  497. var renderedField = field.Render(_documentSerializer, _settings.SerializerRegistry);
  498. var valueSerializer = GetValueSerializerForDistinct(renderedField, _settings.SerializerRegistry);
  499. return new DistinctOperation<TField>(
  500. _collectionNamespace,
  501. valueSerializer,
  502. renderedField.FieldName,
  503. _messageEncoderSettings)
  504. {
  505. Collation = options.Collation,
  506. Filter = filter.Render(_documentSerializer, _settings.SerializerRegistry),
  507. MaxTime = options.MaxTime,
  508. ReadConcern = _settings.ReadConcern
  509. };
  510. }
  511. private FindOneAndDeleteOperation<TProjection> CreateFindOneAndDeleteOperation<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options)
  512. {
  513. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  514. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  515. return new FindOneAndDeleteOperation<TProjection>(
  516. _collectionNamespace,
  517. filter.Render(_documentSerializer, _settings.SerializerRegistry),
  518. new FindAndModifyValueDeserializer<TProjection>(renderedProjection.ProjectionSerializer),
  519. _messageEncoderSettings)
  520. {
  521. Collation = options.Collation,
  522. MaxTime = options.MaxTime,
  523. Projection = renderedProjection.Document,
  524. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  525. WriteConcern = _settings.WriteConcern
  526. };
  527. }
  528. private FindOneAndReplaceOperation<TProjection> CreateFindOneAndReplaceOperation<TProjection>(FilterDefinition<TDocument> filter, object replacementObject, FindOneAndReplaceOptions<TDocument, TProjection> options)
  529. {
  530. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  531. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  532. return new FindOneAndReplaceOperation<TProjection>(
  533. _collectionNamespace,
  534. filter.Render(_documentSerializer, _settings.SerializerRegistry),
  535. new BsonDocumentWrapper(replacementObject, _documentSerializer),
  536. new FindAndModifyValueDeserializer<TProjection>(renderedProjection.ProjectionSerializer),
  537. _messageEncoderSettings)
  538. {
  539. BypassDocumentValidation = options.BypassDocumentValidation,
  540. Collation = options.Collation,
  541. IsUpsert = options.IsUpsert,
  542. MaxTime = options.MaxTime,
  543. Projection = renderedProjection.Document,
  544. ReturnDocument = options.ReturnDocument.ToCore(),
  545. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  546. WriteConcern = _settings.WriteConcern
  547. };
  548. }
  549. private FindOneAndUpdateOperation<TProjection> CreateFindOneAndUpdateOperation<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options)
  550. {
  551. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  552. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  553. return new FindOneAndUpdateOperation<TProjection>(
  554. _collectionNamespace,
  555. filter.Render(_documentSerializer, _settings.SerializerRegistry),
  556. update.Render(_documentSerializer, _settings.SerializerRegistry),
  557. new FindAndModifyValueDeserializer<TProjection>(renderedProjection.ProjectionSerializer),
  558. _messageEncoderSettings)
  559. {
  560. ArrayFilters = RenderArrayFilters(options.ArrayFilters),
  561. BypassDocumentValidation = options.BypassDocumentValidation,
  562. Collation = options.Collation,
  563. IsUpsert = options.IsUpsert,
  564. MaxTime = options.MaxTime,
  565. Projection = renderedProjection.Document,
  566. ReturnDocument = options.ReturnDocument.ToCore(),
  567. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  568. WriteConcern = _settings.WriteConcern
  569. };
  570. }
  571. private FindOperation<TProjection> CreateFindOperation<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options)
  572. {
  573. var projection = options.Projection ?? new ClientSideDeserializationProjectionDefinition<TDocument, TProjection>();
  574. var renderedProjection = projection.Render(_documentSerializer, _settings.SerializerRegistry);
  575. return new FindOperation<TProjection>(
  576. _collectionNamespace,
  577. renderedProjection.ProjectionSerializer,
  578. _messageEncoderSettings)
  579. {
  580. AllowPartialResults = options.AllowPartialResults,
  581. BatchSize = options.BatchSize,
  582. Collation = options.Collation,
  583. Comment = options.Comment,
  584. CursorType = options.CursorType.ToCore(),
  585. Filter = filter.Render(_documentSerializer, _settings.SerializerRegistry),
  586. Limit = options.Limit,
  587. MaxAwaitTime = options.MaxAwaitTime,
  588. MaxTime = options.MaxTime,
  589. Modifiers = options.Modifiers,
  590. NoCursorTimeout = options.NoCursorTimeout,
  591. OplogReplay = options.OplogReplay,
  592. Projection = renderedProjection.Document,
  593. ReadConcern = _settings.ReadConcern,
  594. Skip = options.Skip,
  595. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry)
  596. };
  597. }
  598. private MapReduceOperation<TResult> CreateMapReduceOperation<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options, IBsonSerializer<TResult> resultSerializer)
  599. {
  600. return new MapReduceOperation<TResult>(
  601. _collectionNamespace,
  602. map,
  603. reduce,
  604. resultSerializer,
  605. _messageEncoderSettings)
  606. {
  607. Collation = options.Collation,
  608. Filter = options.Filter == null ? null : options.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  609. FinalizeFunction = options.Finalize,
  610. JavaScriptMode = options.JavaScriptMode,
  611. Limit = options.Limit,
  612. MaxTime = options.MaxTime,
  613. ReadConcern = _settings.ReadConcern,
  614. Scope = options.Scope,
  615. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  616. Verbose = options.Verbose
  617. };
  618. }
  619. private MapReduceOutputToCollectionOperation CreateMapReduceOutputToCollectionOperation<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options, MapReduceOutputOptions outputOptions)
  620. {
  621. var collectionOutputOptions = (MapReduceOutputOptions.CollectionOutput)outputOptions;
  622. var databaseNamespace = collectionOutputOptions.DatabaseName == null ?
  623. _collectionNamespace.DatabaseNamespace :
  624. new DatabaseNamespace(collectionOutputOptions.DatabaseName);
  625. var outputCollectionNamespace = new CollectionNamespace(databaseNamespace, collectionOutputOptions.CollectionName);
  626. return new MapReduceOutputToCollectionOperation(
  627. _collectionNamespace,
  628. outputCollectionNamespace,
  629. map,
  630. reduce,
  631. _messageEncoderSettings)
  632. {
  633. BypassDocumentValidation = options.BypassDocumentValidation,
  634. Collation = options.Collation,
  635. Filter = options.Filter == null ? null : options.Filter.Render(_documentSerializer, _settings.SerializerRegistry),
  636. FinalizeFunction = options.Finalize,
  637. JavaScriptMode = options.JavaScriptMode,
  638. Limit = options.Limit,
  639. MaxTime = options.MaxTime,
  640. NonAtomicOutput = collectionOutputOptions.NonAtomic,
  641. Scope = options.Scope,
  642. OutputMode = collectionOutputOptions.OutputMode,
  643. ShardedOutput = collectionOutputOptions.Sharded,
  644. Sort = options.Sort == null ? null : options.Sort.Render(_documentSerializer, _settings.SerializerRegistry),
  645. Verbose = options.Verbose,
  646. WriteConcern = _settings.WriteConcern
  647. };
  648. }
  649. private FindOperation<TResult> CreateMapReduceOutputToCollectionFindOperation<TResult>(MapReduceOptions<TDocument, TResult> options, CollectionNamespace outputCollectionNamespace, IBsonSerializer<TResult> resultSerializer)
  650. {
  651. return new FindOperation<TResult>(
  652. outputCollectionNamespace,
  653. resultSerializer,
  654. _messageEncoderSettings)
  655. {
  656. Collation = options.Collation,
  657. MaxTime = options.MaxTime,
  658. ReadConcern = _settings.ReadConcern
  659. };
  660. }
  661. private IBsonSerializer<TField> GetValueSerializerForDistinct<TField>(RenderedFieldDefinition<TField> renderedField, IBsonSerializerRegistry serializerRegistry)
  662. {
  663. if (renderedField.UnderlyingSerializer != null)
  664. {
  665. if (renderedField.UnderlyingSerializer.ValueType == typeof(TField))
  666. {
  667. return (IBsonSerializer<TField>)renderedField.UnderlyingSerializer;
  668. }
  669. var arraySerializer = renderedField.UnderlyingSerializer as IBsonArraySerializer;
  670. if (arraySerializer != null)
  671. {
  672. BsonSerializationInfo itemSerializationInfo;
  673. if (arraySerializer.TryGetItemSerializationInfo(out itemSerializationInfo))
  674. {
  675. if (itemSerializationInfo.Serializer.ValueType == typeof(TField))
  676. {
  677. return (IBsonSerializer<TField>)itemSerializationInfo.Serializer;
  678. }
  679. }
  680. }
  681. }
  682. return serializerRegistry.GetSerializer<TField>();
  683. }
  684. private TResult ExecuteReadOperation<TResult>(IReadOperation<TResult> operation, CancellationToken cancellationToken)
  685. {
  686. return ExecuteReadOperation(operation, _settings.ReadPreference, cancellationToken);
  687. }
  688. private TResult ExecuteReadOperation<TResult>(IReadOperation<TResult> operation, ReadPreference readPreference, CancellationToken cancellationToken)
  689. {
  690. using (var binding = new ReadPreferenceBinding(_cluster, readPreference))
  691. {
  692. return _operationExecutor.ExecuteReadOperation(binding, operation, cancellationToken);
  693. }
  694. }
  695. private Task<TResult> ExecuteReadOperationAsync<TResult>(IReadOperation<TResult> operation, CancellationToken cancellationToken)
  696. {
  697. return ExecuteReadOperationAsync(operation, _settings.ReadPreference, cancellationToken);
  698. }
  699. private async Task<TResult> ExecuteReadOperationAsync<TResult>(IReadOperation<TResult> operation, ReadPreference readPreference, CancellationToken cancellationToken)
  700. {
  701. using (var binding = new ReadPreferenceBinding(_cluster, readPreference))
  702. {
  703. return await _operationExecutor.ExecuteReadOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  704. }
  705. }
  706. private TResult ExecuteWriteOperation<TResult>(IWriteOperation<TResult> operation, CancellationToken cancellationToken)
  707. {
  708. using (var binding = new WritableServerBinding(_cluster))
  709. {
  710. return _operationExecutor.ExecuteWriteOperation(binding, operation, cancellationToken);
  711. }
  712. }
  713. private async Task<TResult> ExecuteWriteOperationAsync<TResult>(IWriteOperation<TResult> operation, CancellationToken cancellationToken)
  714. {
  715. using (var binding = new WritableServerBinding(_cluster))
  716. {
  717. return await _operationExecutor.ExecuteWriteOperationAsync(binding, operation, cancellationToken).ConfigureAwait(false);
  718. }
  719. }
  720. private IEnumerable<BsonDocument> RenderArrayFilters(IEnumerable<ArrayFilterDefinition> arrayFilters)
  721. {
  722. if (arrayFilters == null)
  723. {
  724. return null;
  725. }
  726. var renderedArrayFilters = new List<BsonDocument>();
  727. foreach (var arrayFilter in arrayFilters)
  728. {
  729. var renderedArrayFilter = arrayFilter.Render(null, _settings.SerializerRegistry);
  730. renderedArrayFilters.Add(renderedArrayFilter);
  731. }
  732. return renderedArrayFilters;
  733. }
  734. private IBsonSerializer<TResult> ResolveResultSerializer<TResult>(IBsonSerializer<TResult> resultSerializer)
  735. {
  736. if (resultSerializer != null)
  737. {
  738. return resultSerializer;
  739. }
  740. if (typeof(TResult) == typeof(TDocument) && _documentSerializer != null)
  741. {
  742. return (IBsonSerializer<TResult>)_documentSerializer;
  743. }
  744. return _settings.SerializerRegistry.GetSerializer<TResult>();
  745. }
  746. private class MongoIndexManager : MongoIndexManagerBase<TDocument>
  747. {
  748. // private fields
  749. private readonly MongoCollectionImpl<TDocument> _collection;
  750. // constructors
  751. public MongoIndexManager(MongoCollectionImpl<TDocument> collection)
  752. {
  753. _collection = collection;
  754. }
  755. // public properties
  756. public override CollectionNamespace CollectionNamespace
  757. {
  758. get { return _collection.CollectionNamespace; }
  759. }
  760. public override IBsonSerializer<TDocument> DocumentSerializer
  761. {
  762. get { return _collection.DocumentSerializer; }
  763. }
  764. public override MongoCollectionSettings Settings
  765. {
  766. get { return _collection._settings; }
  767. }
  768. // public methods
  769. public override IEnumerable<string> CreateMany(IEnumerable<CreateIndexModel<TDocument>> models, CancellationToken cancellationToken = default(CancellationToken))
  770. {
  771. Ensure.IsNotNull(models, nameof(models));
  772. var requests = CreateCreateIndexRequests(models);
  773. var operation = CreateCreateIndexesOperation(requests);
  774. _collection.ExecuteWriteOperation(operation, cancellationToken);
  775. return requests.Select(x => x.GetIndexName());
  776. }
  777. public async override Task<IEnumerable<string>> CreateManyAsync(IEnumerable<CreateIndexModel<TDocument>> models, CancellationToken cancellationToken = default(CancellationToken))
  778. {
  779. Ensure.IsNotNull(models, nameof(models));
  780. var requests = CreateCreateIndexRequests(models);
  781. var operation = CreateCreateIndexesOperation(requests);
  782. await _collection.ExecuteWriteOperationAsync(operation, cancellationToken).ConfigureAwait(false);
  783. return requests.Select(x => x.GetIndexName());
  784. }
  785. public override void DropAll(CancellationToken cancellationToken)
  786. {
  787. var operation = CreateDropAllOperation();
  788. _collection.ExecuteWriteOperation(operation, cancellationToken);
  789. }
  790. public override Task DropAllAsync(CancellationToken cancellationToken)
  791. {
  792. var operation = CreateDropAllOperation();
  793. return _collection.ExecuteWriteOperationAsync(operation, cancellationToken);
  794. }
  795. public override void DropOne(string name, CancellationToken cancellationToken)
  796. {
  797. Ensure.IsNotNullOrEmpty(name, nameof(name));
  798. if (name == "*")
  799. {
  800. throw new ArgumentException("Cannot specify '*' for the index name. Use DropAllAsync to drop all indexes.", "name");
  801. }
  802. var operation = CreateDropOneOperation(name);
  803. _collection.ExecuteWriteOperation(operation, cancellationToken);
  804. }
  805. public override Task DropOneAsync(string name, CancellationToken cancellationToken)
  806. {
  807. Ensure.IsNotNullOrEmpty(name, nameof(name));
  808. if (name == "*")
  809. {
  810. throw new ArgumentException("Cannot specify '*' for the index name. Use DropAllAsync to drop all indexes.", "name");
  811. }
  812. var operation = CreateDropOneOperation(name);
  813. return _collection.ExecuteWriteOperationAsync(operation, cancellationToken);
  814. }
  815. public override IAsyncCursor<BsonDocument> List(CancellationToken cancellationToken = default(CancellationToken))
  816. {
  817. var operation = CreateListIndexesOperation();
  818. return _collection.ExecuteReadOperation(operation, ReadPreference.Primary, cancellationToken);
  819. }
  820. public override Task<IAsyncCursor<BsonDocument>> ListAsync(CancellationToken cancellationToken = default(CancellationToken))
  821. {
  822. var operation = CreateListIndexesOperation();
  823. return _collection.ExecuteReadOperationAsync(operation, ReadPreference.Primary, cancellationToken);
  824. }
  825. // private methods
  826. private CreateIndexesOperation CreateCreateIndexesOperation(IEnumerable<CreateIndexRequest> requests)
  827. {
  828. return new CreateIndexesOperation(_collection._collectionNamespace, requests, _collection._messageEncoderSettings)
  829. {
  830. WriteConcern = _collection.Settings.WriteConcern
  831. };
  832. }
  833. private IEnumerable<CreateIndexRequest> CreateCreateIndexRequests(IEnumerable<CreateIndexModel<TDocument>> models)
  834. {
  835. return models.Select(m =>
  836. {
  837. var options = m.Options ?? new CreateIndexOptions<TDocument>();
  838. var keysDocument = m.Keys.Render(_collection._documentSerializer, _collection._settings.SerializerRegistry);
  839. var renderedPartialFilterExpression = options.PartialFilterExpression == null ? null : options.PartialFilterExpression.Render(_collection._documentSerializer, _collection._settings.SerializerRegistry);
  840. return new CreateIndexRequest(keysDocument)
  841. {
  842. Name = options.Name,
  843. Background = options.Background,
  844. Bits = options.Bits,
  845. BucketSize = options.BucketSize,
  846. Collation = options.Collation,
  847. DefaultLanguage = options.DefaultLanguage,
  848. ExpireAfter = options.ExpireAfter,
  849. LanguageOverride = options.LanguageOverride,
  850. Max = options.Max,
  851. Min = options.Min,
  852. PartialFilterExpression = renderedPartialFilterExpression,
  853. Sparse = options.Sparse,
  854. SphereIndexVersion = options.SphereIndexVersion,
  855. StorageEngine = options.StorageEngine,
  856. TextIndexVersion = options.TextIndexVersion,
  857. Unique = options.Unique,
  858. Version = options.Version,
  859. Weights = options.Weights
  860. };
  861. });
  862. }
  863. private DropIndexOperation CreateDropAllOperation()
  864. {
  865. return new DropIndexOperation(_collection._collectionNamespace, "*", _collection._messageEncoderSettings)
  866. {
  867. WriteConcern = _collection.Settings.WriteConcern
  868. };
  869. }
  870. private DropIndexOperation CreateDropOneOperation(string name)
  871. {
  872. return new DropIndexOperation(_collection._collectionNamespace, name, _collection._messageEncoderSettings)
  873. {
  874. WriteConcern = _collection.Settings.WriteConcern
  875. };
  876. }
  877. private ListIndexesOperation CreateListIndexesOperation()
  878. {
  879. return new ListIndexesOperation(_collection._collectionNamespace, _collection._messageEncoderSettings);
  880. }
  881. }
  882. }
  883. }