/* Copyright 2010-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Driver.Core.Misc; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace MongoDB.Driver { internal class AggregateFluent : AggregateFluentBase { // fields private readonly IMongoCollection _collection; private readonly AggregateOptions _options; private readonly PipelineDefinition _pipeline; private readonly IClientSessionHandle _session; // constructors public AggregateFluent(IClientSessionHandle session, IMongoCollection collection, PipelineDefinition pipeline, AggregateOptions options) { _session = session; // can be null _collection = Ensure.IsNotNull(collection, nameof(collection)); _pipeline = Ensure.IsNotNull(pipeline, nameof(pipeline)); _options = Ensure.IsNotNull(options, nameof(options)); } // properties public override IMongoDatabase Database { get { return _collection.Database; } } public override AggregateOptions Options { get { return _options; } } public override IList Stages { get { return _pipeline.Stages.ToList(); } } // methods public override IAggregateFluent AppendStage(PipelineStageDefinition stage) { return WithPipeline(_pipeline.AppendStage(stage)); } public override IAggregateFluent As(IBsonSerializer newResultSerializer) { return WithPipeline(_pipeline.As(newResultSerializer)); } public override IAggregateFluent> Bucket( AggregateExpressionDefinition groupBy, IEnumerable boundaries, AggregateBucketOptions options = null) { return WithPipeline(_pipeline.Bucket(groupBy, boundaries, options)); } public override IAggregateFluent Bucket( AggregateExpressionDefinition groupBy, IEnumerable boundaries, ProjectionDefinition output, AggregateBucketOptions options = null) { return WithPipeline(_pipeline.Bucket(groupBy, boundaries, output, options)); } public override IAggregateFluent> BucketAuto( AggregateExpressionDefinition groupBy, int buckets, AggregateBucketAutoOptions options = null) { return WithPipeline(_pipeline.BucketAuto(groupBy, buckets, options)); } public override IAggregateFluent BucketAuto( AggregateExpressionDefinition groupBy, int buckets, ProjectionDefinition output, AggregateBucketAutoOptions options = null) { return WithPipeline(_pipeline.BucketAuto(groupBy, buckets, output, options)); } public override IAggregateFluent> ChangeStream(ChangeStreamStageOptions options = null) { return WithPipeline(_pipeline.ChangeStream(options)); } public override IAggregateFluent Count() { return WithPipeline(_pipeline.Count()); } public override IAggregateFluent Facet( IEnumerable> facets, AggregateFacetOptions options = null) { return WithPipeline(_pipeline.Facet(facets, options)); } public override IAggregateFluent GraphLookup( IMongoCollection from, FieldDefinition connectFromField, FieldDefinition connectToField, AggregateExpressionDefinition startWith, FieldDefinition @as, FieldDefinition depthField, AggregateGraphLookupOptions options = null) { return WithPipeline(_pipeline.GraphLookup(from, connectFromField, connectToField, startWith, @as, depthField, options)); } public override IAggregateFluent Group(ProjectionDefinition group) { return WithPipeline(_pipeline.Group(group)); } public override IAggregateFluent Limit(int limit) { return WithPipeline(_pipeline.Limit(limit)); } public override IAggregateFluent Lookup(string foreignCollectionName, FieldDefinition localField, FieldDefinition foreignField, FieldDefinition @as, AggregateLookupOptions options) { Ensure.IsNotNull(foreignCollectionName, nameof(foreignCollectionName)); var foreignCollection = _collection.Database.GetCollection(foreignCollectionName); return WithPipeline(_pipeline.Lookup(foreignCollection, localField, foreignField, @as, options)); } public override IAggregateFluent Lookup( IMongoCollection foreignCollection, BsonDocument let, PipelineDefinition lookupPipeline, FieldDefinition @as, AggregateLookupOptions options = null) { Ensure.IsNotNull(foreignCollection, nameof(foreignCollection)); return WithPipeline(_pipeline.Lookup(foreignCollection, let, lookupPipeline, @as)); } public override IAggregateFluent Match(FilterDefinition filter) { return WithPipeline(_pipeline.Match(filter)); } public override IAggregateFluent OfType(IBsonSerializer newResultSerializer) { return WithPipeline(_pipeline.OfType(newResultSerializer)); } public override IAsyncCursor Out(string collectionName, CancellationToken cancellationToken) { Ensure.IsNotNull(collectionName, nameof(collectionName)); var outputCollection = Database.GetCollection(collectionName); var aggregate = WithPipeline(_pipeline.Out(outputCollection)); return aggregate.ToCursor(cancellationToken); } public override Task> OutAsync(string collectionName, CancellationToken cancellationToken) { Ensure.IsNotNull(collectionName, nameof(collectionName)); var outputCollection = Database.GetCollection(collectionName); var aggregate = WithPipeline(_pipeline.Out(outputCollection)); return aggregate.ToCursorAsync(cancellationToken); } public override IAggregateFluent Project(ProjectionDefinition projection) { return WithPipeline(_pipeline.Project(projection)); } public override IAggregateFluent ReplaceRoot(AggregateExpressionDefinition newRoot) { return WithPipeline(_pipeline.ReplaceRoot(newRoot)); } public override IAggregateFluent Skip(int skip) { return WithPipeline(_pipeline.Skip(skip)); } public override IAggregateFluent Sort(SortDefinition sort) { return WithPipeline(_pipeline.Sort(sort)); } public override IAggregateFluent> SortByCount(AggregateExpressionDefinition id) { return WithPipeline(_pipeline.SortByCount(id)); } public override IOrderedAggregateFluent ThenBy(SortDefinition newSort) { Ensure.IsNotNull(newSort, nameof(newSort)); var stages = _pipeline.Stages.ToList(); var oldSortStage = (SortPipelineStageDefinition)stages[stages.Count - 1]; var oldSort = oldSortStage.Sort; var combinedSort = Builders.Sort.Combine(oldSort, newSort); var combinedSortStage = PipelineStageDefinitionBuilder.Sort(combinedSort); stages[stages.Count - 1] = combinedSortStage; var newPipeline = new PipelineStagePipelineDefinition(stages); return (IOrderedAggregateFluent)WithPipeline(newPipeline); } public override IAggregateFluent Unwind(FieldDefinition field, IBsonSerializer newResultSerializer) { return WithPipeline(_pipeline.Unwind(field, new AggregateUnwindOptions { ResultSerializer = newResultSerializer })); } public override IAggregateFluent Unwind(FieldDefinition field, AggregateUnwindOptions options) { return WithPipeline(_pipeline.Unwind(field, options)); } public override IAsyncCursor ToCursor(CancellationToken cancellationToken) { if (_session == null) { return _collection.Aggregate(_pipeline, _options, cancellationToken); } else { return _collection.Aggregate(_session, _pipeline, _options, cancellationToken); } } public override Task> ToCursorAsync(CancellationToken cancellationToken) { if (_session == null) { return _collection.AggregateAsync(_pipeline, _options, cancellationToken); } else { return _collection.AggregateAsync(_session, _pipeline, _options, cancellationToken); } } public override string ToString() { return $"aggregate({_pipeline})"; } public IAggregateFluent WithPipeline(PipelineDefinition pipeline) { return new AggregateFluent(_session, _collection, pipeline, _options); } } }