AggregateFluent.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /* Copyright 2010-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using MongoDB.Bson;
  16. using MongoDB.Bson.Serialization;
  17. using MongoDB.Driver.Core.Misc;
  18. using System.Collections.Generic;
  19. using System.Linq;
  20. using System.Threading;
  21. using System.Threading.Tasks;
  22. namespace MongoDB.Driver
  23. {
  24. internal class AggregateFluent<TDocument, TResult> : AggregateFluentBase<TResult>
  25. {
  26. // fields
  27. private readonly IMongoCollection<TDocument> _collection;
  28. private readonly AggregateOptions _options;
  29. private readonly PipelineDefinition<TDocument, TResult> _pipeline;
  30. private readonly IClientSessionHandle _session;
  31. // constructors
  32. public AggregateFluent(IClientSessionHandle session, IMongoCollection<TDocument> collection, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options)
  33. {
  34. _session = session; // can be null
  35. _collection = Ensure.IsNotNull(collection, nameof(collection));
  36. _pipeline = Ensure.IsNotNull(pipeline, nameof(pipeline));
  37. _options = Ensure.IsNotNull(options, nameof(options));
  38. }
  39. // properties
  40. public override IMongoDatabase Database
  41. {
  42. get { return _collection.Database; }
  43. }
  44. public override AggregateOptions Options
  45. {
  46. get { return _options; }
  47. }
  48. public override IList<IPipelineStageDefinition> Stages
  49. {
  50. get { return _pipeline.Stages.ToList(); }
  51. }
  52. // methods
  53. public override IAggregateFluent<TNewResult> AppendStage<TNewResult>(PipelineStageDefinition<TResult, TNewResult> stage)
  54. {
  55. return WithPipeline(_pipeline.AppendStage(stage));
  56. }
  57. public override IAggregateFluent<TNewResult> As<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer)
  58. {
  59. return WithPipeline(_pipeline.As(newResultSerializer));
  60. }
  61. public override IAggregateFluent<AggregateBucketResult<TValue>> Bucket<TValue>(
  62. AggregateExpressionDefinition<TResult, TValue> groupBy,
  63. IEnumerable<TValue> boundaries,
  64. AggregateBucketOptions<TValue> options = null)
  65. {
  66. return WithPipeline(_pipeline.Bucket(groupBy, boundaries, options));
  67. }
  68. public override IAggregateFluent<TNewResult> Bucket<TValue, TNewResult>(
  69. AggregateExpressionDefinition<TResult, TValue> groupBy,
  70. IEnumerable<TValue> boundaries,
  71. ProjectionDefinition<TResult, TNewResult> output,
  72. AggregateBucketOptions<TValue> options = null)
  73. {
  74. return WithPipeline(_pipeline.Bucket(groupBy, boundaries, output, options));
  75. }
  76. public override IAggregateFluent<AggregateBucketAutoResult<TValue>> BucketAuto<TValue>(
  77. AggregateExpressionDefinition<TResult, TValue> groupBy,
  78. int buckets,
  79. AggregateBucketAutoOptions options = null)
  80. {
  81. return WithPipeline(_pipeline.BucketAuto(groupBy, buckets, options));
  82. }
  83. public override IAggregateFluent<TNewResult> BucketAuto<TValue, TNewResult>(
  84. AggregateExpressionDefinition<TResult, TValue> groupBy,
  85. int buckets,
  86. ProjectionDefinition<TResult, TNewResult> output,
  87. AggregateBucketAutoOptions options = null)
  88. {
  89. return WithPipeline(_pipeline.BucketAuto(groupBy, buckets, output, options));
  90. }
  91. public override IAggregateFluent<ChangeStreamDocument<TResult>> ChangeStream(ChangeStreamStageOptions options = null)
  92. {
  93. return WithPipeline(_pipeline.ChangeStream(options));
  94. }
  95. public override IAggregateFluent<AggregateCountResult> Count()
  96. {
  97. return WithPipeline(_pipeline.Count());
  98. }
  99. public override IAggregateFluent<TNewResult> Facet<TNewResult>(
  100. IEnumerable<AggregateFacet<TResult>> facets,
  101. AggregateFacetOptions<TNewResult> options = null)
  102. {
  103. return WithPipeline(_pipeline.Facet(facets, options));
  104. }
  105. public override IAggregateFluent<TNewResult> GraphLookup<TFrom, TConnectFrom, TConnectTo, TStartWith, TAsElement, TAs, TNewResult>(
  106. IMongoCollection<TFrom> from,
  107. FieldDefinition<TFrom, TConnectFrom> connectFromField,
  108. FieldDefinition<TFrom, TConnectTo> connectToField,
  109. AggregateExpressionDefinition<TResult, TStartWith> startWith,
  110. FieldDefinition<TNewResult, TAs> @as,
  111. FieldDefinition<TAsElement, int> depthField,
  112. AggregateGraphLookupOptions<TFrom, TAsElement, TNewResult> options = null)
  113. {
  114. return WithPipeline(_pipeline.GraphLookup(from, connectFromField, connectToField, startWith, @as, depthField, options));
  115. }
  116. public override IAggregateFluent<TNewResult> Group<TNewResult>(ProjectionDefinition<TResult, TNewResult> group)
  117. {
  118. return WithPipeline(_pipeline.Group(group));
  119. }
  120. public override IAggregateFluent<TResult> Limit(int limit)
  121. {
  122. return WithPipeline(_pipeline.Limit(limit));
  123. }
  124. public override IAggregateFluent<TNewResult> Lookup<TForeignDocument, TNewResult>(string foreignCollectionName, FieldDefinition<TResult> localField, FieldDefinition<TForeignDocument> foreignField, FieldDefinition<TNewResult> @as, AggregateLookupOptions<TForeignDocument, TNewResult> options)
  125. {
  126. Ensure.IsNotNull(foreignCollectionName, nameof(foreignCollectionName));
  127. var foreignCollection = _collection.Database.GetCollection<TForeignDocument>(foreignCollectionName);
  128. return WithPipeline(_pipeline.Lookup(foreignCollection, localField, foreignField, @as, options));
  129. }
  130. public override IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResult>(
  131. IMongoCollection<TForeignDocument> foreignCollection,
  132. BsonDocument let,
  133. PipelineDefinition<TForeignDocument, TAsElement> lookupPipeline,
  134. FieldDefinition<TNewResult, TAs> @as,
  135. AggregateLookupOptions<TForeignDocument, TNewResult> options = null)
  136. {
  137. Ensure.IsNotNull(foreignCollection, nameof(foreignCollection));
  138. return WithPipeline(_pipeline.Lookup(foreignCollection, let, lookupPipeline, @as));
  139. }
  140. public override IAggregateFluent<TResult> Match(FilterDefinition<TResult> filter)
  141. {
  142. return WithPipeline(_pipeline.Match(filter));
  143. }
  144. public override IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer)
  145. {
  146. return WithPipeline(_pipeline.OfType(newResultSerializer));
  147. }
  148. public override IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken)
  149. {
  150. Ensure.IsNotNull(collectionName, nameof(collectionName));
  151. var outputCollection = Database.GetCollection<TResult>(collectionName);
  152. var aggregate = WithPipeline(_pipeline.Out(outputCollection));
  153. return aggregate.ToCursor(cancellationToken);
  154. }
  155. public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken)
  156. {
  157. Ensure.IsNotNull(collectionName, nameof(collectionName));
  158. var outputCollection = Database.GetCollection<TResult>(collectionName);
  159. var aggregate = WithPipeline(_pipeline.Out(outputCollection));
  160. return aggregate.ToCursorAsync(cancellationToken);
  161. }
  162. public override IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection)
  163. {
  164. return WithPipeline(_pipeline.Project(projection));
  165. }
  166. public override IAggregateFluent<TNewResult> ReplaceRoot<TNewResult>(AggregateExpressionDefinition<TResult, TNewResult> newRoot)
  167. {
  168. return WithPipeline(_pipeline.ReplaceRoot(newRoot));
  169. }
  170. public override IAggregateFluent<TResult> Skip(int skip)
  171. {
  172. return WithPipeline(_pipeline.Skip(skip));
  173. }
  174. public override IAggregateFluent<TResult> Sort(SortDefinition<TResult> sort)
  175. {
  176. return WithPipeline(_pipeline.Sort(sort));
  177. }
  178. public override IAggregateFluent<AggregateSortByCountResult<TId>> SortByCount<TId>(AggregateExpressionDefinition<TResult, TId> id)
  179. {
  180. return WithPipeline(_pipeline.SortByCount(id));
  181. }
  182. public override IOrderedAggregateFluent<TResult> ThenBy(SortDefinition<TResult> newSort)
  183. {
  184. Ensure.IsNotNull(newSort, nameof(newSort));
  185. var stages = _pipeline.Stages.ToList();
  186. var oldSortStage = (SortPipelineStageDefinition<TResult>)stages[stages.Count - 1];
  187. var oldSort = oldSortStage.Sort;
  188. var combinedSort = Builders<TResult>.Sort.Combine(oldSort, newSort);
  189. var combinedSortStage = PipelineStageDefinitionBuilder.Sort(combinedSort);
  190. stages[stages.Count - 1] = combinedSortStage;
  191. var newPipeline = new PipelineStagePipelineDefinition<TDocument, TResult>(stages);
  192. return (IOrderedAggregateFluent<TResult>)WithPipeline(newPipeline);
  193. }
  194. public override IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<TResult> field, IBsonSerializer<TNewResult> newResultSerializer)
  195. {
  196. return WithPipeline(_pipeline.Unwind(field, new AggregateUnwindOptions<TNewResult> { ResultSerializer = newResultSerializer }));
  197. }
  198. public override IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<TResult> field, AggregateUnwindOptions<TNewResult> options)
  199. {
  200. return WithPipeline(_pipeline.Unwind(field, options));
  201. }
  202. public override IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
  203. {
  204. if (_session == null)
  205. {
  206. return _collection.Aggregate(_pipeline, _options, cancellationToken);
  207. }
  208. else
  209. {
  210. return _collection.Aggregate(_session, _pipeline, _options, cancellationToken);
  211. }
  212. }
  213. public override Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken cancellationToken)
  214. {
  215. if (_session == null)
  216. {
  217. return _collection.AggregateAsync(_pipeline, _options, cancellationToken);
  218. }
  219. else
  220. {
  221. return _collection.AggregateAsync(_session, _pipeline, _options, cancellationToken);
  222. }
  223. }
  224. public override string ToString()
  225. {
  226. return $"aggregate({_pipeline})";
  227. }
  228. public IAggregateFluent<TNewResult> WithPipeline<TNewResult>(PipelineDefinition<TDocument, TNewResult> pipeline)
  229. {
  230. return new AggregateFluent<TDocument, TNewResult>(_session, _collection, pipeline, _options);
  231. }
  232. }
  233. }