AggregateFluent.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. /* Copyright 2010-2016 MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System.Collections.Generic;
  16. using System.Linq;
  17. using System.Threading;
  18. using System.Threading.Tasks;
  19. using MongoDB.Bson.Serialization;
  20. using MongoDB.Driver.Core.Misc;
  21. namespace MongoDB.Driver
  22. {
  23. internal class AggregateFluent<TDocument, TResult> : AggregateFluentBase<TResult>
  24. {
  25. // fields
  26. private readonly IMongoCollection<TDocument> _collection;
  27. private readonly AggregateOptions _options;
  28. private readonly PipelineDefinition<TDocument, TResult> _pipeline;
  29. // constructors
  30. public AggregateFluent(IMongoCollection<TDocument> collection, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options)
  31. {
  32. _collection = Ensure.IsNotNull(collection, nameof(collection));
  33. _pipeline = Ensure.IsNotNull(pipeline, nameof(pipeline));
  34. _options = Ensure.IsNotNull(options, nameof(options));
  35. }
  36. // properties
  37. public override IMongoDatabase Database
  38. {
  39. get { return _collection.Database; }
  40. }
  41. public override AggregateOptions Options
  42. {
  43. get { return _options; }
  44. }
  45. public override IList<IPipelineStageDefinition> Stages
  46. {
  47. get { return _pipeline.Stages.ToList(); }
  48. }
  49. // methods
  50. public override IAggregateFluent<TNewResult> AppendStage<TNewResult>(PipelineStageDefinition<TResult, TNewResult> stage)
  51. {
  52. return WithPipeline(_pipeline.AppendStage(stage));
  53. }
  54. public override IAggregateFluent<TNewResult> As<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer)
  55. {
  56. return WithPipeline(_pipeline.As(newResultSerializer));
  57. }
  58. public override IAggregateFluent<AggregateBucketResult<TValue>> Bucket<TValue>(
  59. AggregateExpressionDefinition<TResult, TValue> groupBy,
  60. IEnumerable<TValue> boundaries,
  61. AggregateBucketOptions<TValue> options = null)
  62. {
  63. return WithPipeline(_pipeline.Bucket(groupBy, boundaries, options));
  64. }
  65. public override IAggregateFluent<TNewResult> Bucket<TValue, TNewResult>(
  66. AggregateExpressionDefinition<TResult, TValue> groupBy,
  67. IEnumerable<TValue> boundaries,
  68. ProjectionDefinition<TResult, TNewResult> output,
  69. AggregateBucketOptions<TValue> options = null)
  70. {
  71. return WithPipeline(_pipeline.Bucket(groupBy, boundaries, output, options));
  72. }
  73. public override IAggregateFluent<AggregateBucketAutoResult<TValue>> BucketAuto<TValue>(
  74. AggregateExpressionDefinition<TResult, TValue> groupBy,
  75. int buckets,
  76. AggregateBucketAutoOptions options = null)
  77. {
  78. return WithPipeline(_pipeline.BucketAuto(groupBy, buckets, options));
  79. }
  80. public override IAggregateFluent<TNewResult> BucketAuto<TValue, TNewResult>(
  81. AggregateExpressionDefinition<TResult, TValue> groupBy,
  82. int buckets,
  83. ProjectionDefinition<TResult, TNewResult> output,
  84. AggregateBucketAutoOptions options = null)
  85. {
  86. return WithPipeline(_pipeline.BucketAuto(groupBy, buckets, output, options));
  87. }
  88. public override IAggregateFluent<AggregateCountResult> Count()
  89. {
  90. return WithPipeline(_pipeline.Count());
  91. }
  92. public override IAggregateFluent<TNewResult> Facet<TNewResult>(
  93. IEnumerable<AggregateFacet<TResult>> facets,
  94. AggregateFacetOptions<TNewResult> options = null)
  95. {
  96. return WithPipeline(_pipeline.Facet(facets, options));
  97. }
  98. public override IAggregateFluent<TNewResult> GraphLookup<TFrom, TConnectFrom, TConnectTo, TStartWith, TAsElement, TAs, TNewResult>(
  99. IMongoCollection<TFrom> from,
  100. FieldDefinition<TFrom, TConnectFrom> connectFromField,
  101. FieldDefinition<TFrom, TConnectTo> connectToField,
  102. AggregateExpressionDefinition<TResult, TStartWith> startWith,
  103. FieldDefinition<TNewResult, TAs> @as,
  104. FieldDefinition<TAsElement, int> depthField,
  105. AggregateGraphLookupOptions<TFrom, TAsElement, TNewResult> options = null)
  106. {
  107. return WithPipeline(_pipeline.GraphLookup(from, connectFromField, connectToField, startWith, @as, depthField, options));
  108. }
  109. public override IAggregateFluent<TNewResult> Group<TNewResult>(ProjectionDefinition<TResult, TNewResult> group)
  110. {
  111. return WithPipeline(_pipeline.Group(group));
  112. }
  113. public override IAggregateFluent<TResult> Limit(int limit)
  114. {
  115. return WithPipeline(_pipeline.Limit(limit));
  116. }
  117. public override IAggregateFluent<TNewResult> Lookup<TForeignDocument, TNewResult>(string foreignCollectionName, FieldDefinition<TResult> localField, FieldDefinition<TForeignDocument> foreignField, FieldDefinition<TNewResult> @as, AggregateLookupOptions<TForeignDocument, TNewResult> options)
  118. {
  119. Ensure.IsNotNull(foreignCollectionName, nameof(foreignCollectionName));
  120. var foreignCollection = _collection.Database.GetCollection<TForeignDocument>(foreignCollectionName);
  121. return WithPipeline(_pipeline.Lookup(foreignCollection, localField, foreignField, @as, options));
  122. }
  123. public override IAggregateFluent<TResult> Match(FilterDefinition<TResult> filter)
  124. {
  125. return WithPipeline(_pipeline.Match(filter));
  126. }
  127. public override IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer)
  128. {
  129. return WithPipeline(_pipeline.OfType(newResultSerializer));
  130. }
  131. public override IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken)
  132. {
  133. Ensure.IsNotNull(collectionName, nameof(collectionName));
  134. var outputCollection = Database.GetCollection<TResult>(collectionName);
  135. var aggregate = WithPipeline(_pipeline.Out(outputCollection));
  136. return aggregate.ToCursor(cancellationToken);
  137. }
  138. public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken)
  139. {
  140. Ensure.IsNotNull(collectionName, nameof(collectionName));
  141. var outputCollection = Database.GetCollection<TResult>(collectionName);
  142. var aggregate = WithPipeline(_pipeline.Out(outputCollection));
  143. return aggregate.ToCursorAsync(cancellationToken);
  144. }
  145. public override IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection)
  146. {
  147. return WithPipeline(_pipeline.Project(projection));
  148. }
  149. public override IAggregateFluent<TNewResult> ReplaceRoot<TNewResult>(AggregateExpressionDefinition<TResult, TNewResult> newRoot)
  150. {
  151. return WithPipeline(_pipeline.ReplaceRoot(newRoot));
  152. }
  153. public override IAggregateFluent<TResult> Skip(int skip)
  154. {
  155. return WithPipeline(_pipeline.Skip(skip));
  156. }
  157. public override IAggregateFluent<TResult> Sort(SortDefinition<TResult> sort)
  158. {
  159. return WithPipeline(_pipeline.Sort(sort));
  160. }
  161. public override IAggregateFluent<AggregateSortByCountResult<TId>> SortByCount<TId>(AggregateExpressionDefinition<TResult, TId> id)
  162. {
  163. return WithPipeline(_pipeline.SortByCount(id));
  164. }
  165. public override IOrderedAggregateFluent<TResult> ThenBy(SortDefinition<TResult> newSort)
  166. {
  167. Ensure.IsNotNull(newSort, nameof(newSort));
  168. var stages = _pipeline.Stages.ToList();
  169. var oldSortStage = (SortPipelineStageDefinition<TResult>)stages[stages.Count - 1];
  170. var oldSort = oldSortStage.Sort;
  171. var combinedSort = Builders<TResult>.Sort.Combine(oldSort, newSort);
  172. var combinedSortStage = PipelineStageDefinitionBuilder.Sort(combinedSort);
  173. stages[stages.Count - 1] = combinedSortStage;
  174. var newPipeline = new PipelineStagePipelineDefinition<TDocument, TResult>(stages);
  175. return (IOrderedAggregateFluent<TResult>)WithPipeline(newPipeline);
  176. }
  177. public override IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<TResult> field, IBsonSerializer<TNewResult> newResultSerializer)
  178. {
  179. return WithPipeline(_pipeline.Unwind(field, new AggregateUnwindOptions<TNewResult> { ResultSerializer = newResultSerializer }));
  180. }
  181. public override IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<TResult> field, AggregateUnwindOptions<TNewResult> options)
  182. {
  183. return WithPipeline(_pipeline.Unwind(field, options));
  184. }
  185. public override IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
  186. {
  187. return _collection.Aggregate(_pipeline, _options, cancellationToken);
  188. }
  189. public override Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken cancellationToken)
  190. {
  191. return _collection.AggregateAsync(_pipeline, _options, cancellationToken);
  192. }
  193. public override string ToString()
  194. {
  195. return $"aggregate({_pipeline})";
  196. }
  197. public IAggregateFluent<TNewResult> WithPipeline<TNewResult>(PipelineDefinition<TDocument, TNewResult> pipeline)
  198. {
  199. return new AggregateFluent<TDocument, TNewResult>(_collection, pipeline, _options);
  200. }
  201. }
  202. }