PipelineDefinition.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. /* Copyright 2010-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using MongoDB.Bson;
  19. using MongoDB.Bson.Serialization;
  20. using MongoDB.Bson.Serialization.Serializers;
  21. using MongoDB.Driver.Core.Misc;
  22. namespace MongoDB.Driver
  23. {
  24. /// <summary>
  25. /// A rendered pipeline.
  26. /// </summary>
  27. /// <typeparam name="TOutput">The type of the output.</typeparam>
  28. public class RenderedPipelineDefinition<TOutput>
  29. {
  30. private List<BsonDocument> _documents;
  31. private IBsonSerializer<TOutput> _outputSerializer;
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="RenderedPipelineDefinition{TOutput}"/> class.
  34. /// </summary>
  35. /// <param name="documents">The pipeline.</param>
  36. /// <param name="outputSerializer">The output serializer.</param>
  37. public RenderedPipelineDefinition(IEnumerable<BsonDocument> documents, IBsonSerializer<TOutput> outputSerializer)
  38. {
  39. _documents = Ensure.IsNotNull(documents, nameof(documents)).ToList();
  40. _outputSerializer = Ensure.IsNotNull(outputSerializer, nameof(outputSerializer));
  41. }
  42. /// <summary>
  43. /// Gets the documents.
  44. /// </summary>
  45. public IList<BsonDocument> Documents
  46. {
  47. get { return _documents; }
  48. }
  49. /// <summary>
  50. /// Gets the serializer.
  51. /// </summary>
  52. public IBsonSerializer<TOutput> OutputSerializer
  53. {
  54. get { return _outputSerializer; }
  55. }
  56. }
  57. /// <summary>
  58. /// Base class for a pipeline.
  59. /// </summary>
  60. /// <typeparam name="TInput">The type of the input.</typeparam>
  61. /// <typeparam name="TOutput">The type of the output.</typeparam>
  62. public abstract class PipelineDefinition<TInput, TOutput>
  63. {
  64. /// <summary>
  65. /// Gets the output serializer.
  66. /// </summary>
  67. public abstract IBsonSerializer<TOutput> OutputSerializer { get; }
  68. /// <summary>
  69. /// Gets the stages.
  70. /// </summary>
  71. public abstract IEnumerable<IPipelineStageDefinition> Stages { get; }
  72. /// <summary>
  73. /// Renders the pipeline.
  74. /// </summary>
  75. /// <param name="inputSerializer">The input serializer.</param>
  76. /// <param name="serializerRegistry">The serializer registry.</param>
  77. /// <returns>A <see cref="RenderedPipelineDefinition{TOutput}"/></returns>
  78. public abstract RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry);
  79. /// <inheritdoc/>
  80. public override string ToString()
  81. {
  82. var serializerRegistry = BsonSerializer.SerializerRegistry;
  83. var inputSerializer = serializerRegistry.GetSerializer<TInput>();
  84. return ToString(inputSerializer, serializerRegistry);
  85. }
  86. /// <summary>
  87. /// Returns a <see cref="System.String" /> that represents this instance.
  88. /// </summary>
  89. /// <param name="inputSerializer">The input serializer.</param>
  90. /// <param name="serializerRegistry">The serializer registry.</param>
  91. /// <returns>
  92. /// A <see cref="System.String" /> that represents this instance.
  93. /// </returns>
  94. public string ToString(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
  95. {
  96. var renderedPipeline = Render(inputSerializer, serializerRegistry);
  97. return $"[{string.Join(", ", renderedPipeline.Documents.Select(stage => stage.ToJson()))}]";
  98. }
  99. /// <summary>
  100. /// Creates a pipeline.
  101. /// </summary>
  102. /// <param name="stages">The stages.</param>
  103. /// <param name="outputSerializer">The output serializer.</param>
  104. /// <returns>A <see cref="PipelineDefinition{TInput, TOutput}"/>.</returns>
  105. public static PipelineDefinition<TInput, TOutput> Create(
  106. IEnumerable<IPipelineStageDefinition> stages,
  107. IBsonSerializer<TOutput> outputSerializer = null)
  108. {
  109. if (stages == null)
  110. {
  111. return null;
  112. }
  113. return new PipelineStagePipelineDefinition<TInput, TOutput>(stages, outputSerializer);
  114. }
  115. /// <summary>
  116. /// Creates a pipeline.
  117. /// </summary>
  118. /// <param name="stages">The stages.</param>
  119. /// <param name="outputSerializer">The output serializer.</param>
  120. /// <returns>A <see cref="PipelineDefinition{TInput, TOutput}"/>.</returns>
  121. public static PipelineDefinition<TInput, TOutput> Create(
  122. IEnumerable<BsonDocument> stages,
  123. IBsonSerializer<TOutput> outputSerializer = null)
  124. {
  125. if (stages == null)
  126. {
  127. return null;
  128. }
  129. return new BsonDocumentStagePipelineDefinition<TInput, TOutput>(stages, outputSerializer);
  130. }
  131. /// <summary>
  132. /// Creates a pipeline.
  133. /// </summary>
  134. /// <param name="stages">The stages.</param>
  135. /// <param name="outputSerializer">The output serializer.</param>
  136. /// <returns>A <see cref="PipelineDefinition{TInput, TOutput}"/>.</returns>
  137. public static PipelineDefinition<TInput, TOutput> Create(
  138. IEnumerable<string> stages,
  139. IBsonSerializer<TOutput> outputSerializer = null)
  140. {
  141. return Create(stages?.Select(s => BsonDocument.Parse(s)), outputSerializer);
  142. }
  143. /// <summary>
  144. /// Creates a pipeline.
  145. /// </summary>
  146. /// <param name="stages">The stages.</param>
  147. /// <returns>A <see cref="PipelineDefinition{TInput, TOutput}"/>.</returns>
  148. public static PipelineDefinition<TInput, TOutput> Create(
  149. params BsonDocument[] stages)
  150. {
  151. return Create((IEnumerable<BsonDocument>)stages);
  152. }
  153. /// <summary>
  154. /// Creates a pipeline.
  155. /// </summary>
  156. /// <param name="stages">The stages.</param>
  157. /// <returns>A <see cref="PipelineDefinition{TInput, TOutput}"/>.</returns>
  158. public static PipelineDefinition<TInput, TOutput> Create(
  159. params string[] stages)
  160. {
  161. return Create((IEnumerable<string>)stages);
  162. }
  163. /// <summary>
  164. /// Performs an implicit conversion from <see cref="IPipelineStageDefinition"/>[] to <see cref="PipelineDefinition{TInput, TOutput}"/>.
  165. /// </summary>
  166. /// <param name="stages">The stages.</param>
  167. /// <returns>
  168. /// The result of the conversion.
  169. /// </returns>
  170. public static implicit operator PipelineDefinition<TInput, TOutput>(IPipelineStageDefinition[] stages)
  171. {
  172. return Create(stages);
  173. }
  174. /// <summary>
  175. /// Performs an implicit conversion from <see cref="List{IPipelineStage}"/> to <see cref="PipelineDefinition{TInput, TOutput}"/>.
  176. /// </summary>
  177. /// <param name="stages">The stages.</param>
  178. /// <returns>
  179. /// The result of the conversion.
  180. /// </returns>
  181. public static implicit operator PipelineDefinition<TInput, TOutput>(List<IPipelineStageDefinition> stages)
  182. {
  183. return Create(stages);
  184. }
  185. /// <summary>
  186. /// Performs an implicit conversion from <see cref="BsonDocument"/>[] to <see cref="PipelineDefinition{TInput, TOutput}"/>.
  187. /// </summary>
  188. /// <param name="stages">The stages.</param>
  189. /// <returns>
  190. /// The result of the conversion.
  191. /// </returns>
  192. public static implicit operator PipelineDefinition<TInput, TOutput>(BsonDocument[] stages)
  193. {
  194. return Create(stages);
  195. }
  196. /// <summary>
  197. /// Performs an implicit conversion from <see cref="List{BsonDocument}"/> to <see cref="PipelineDefinition{TInput, TOutput}"/>.
  198. /// </summary>
  199. /// <param name="stages">The stages.</param>
  200. /// <returns>
  201. /// The result of the conversion.
  202. /// </returns>
  203. public static implicit operator PipelineDefinition<TInput, TOutput>(List<BsonDocument> stages)
  204. {
  205. return Create(stages);
  206. }
  207. }
  208. /// <summary>
  209. /// A pipeline composed of instances of <see cref="BsonDocument"/>.
  210. /// </summary>
  211. /// <typeparam name="TInput">The type of the input.</typeparam>
  212. /// <typeparam name="TOutput">The type of the output.</typeparam>
  213. public sealed class BsonDocumentStagePipelineDefinition<TInput, TOutput> : PipelineDefinition<TInput, TOutput>
  214. {
  215. private readonly IBsonSerializer<TOutput> _outputSerializer;
  216. private readonly List<BsonDocument> _stages;
  217. /// <summary>
  218. /// Initializes a new instance of the <see cref="BsonDocumentStagePipelineDefinition{TInput, TOutput}"/> class.
  219. /// </summary>
  220. /// <param name="stages">The stages.</param>
  221. /// <param name="outputSerializer">The output serializer.</param>
  222. public BsonDocumentStagePipelineDefinition(IEnumerable<BsonDocument> stages, IBsonSerializer<TOutput> outputSerializer = null)
  223. {
  224. _stages = Ensure.IsNotNull(stages, nameof(stages)).ToList();
  225. _outputSerializer = outputSerializer;
  226. }
  227. /// <inheritdoc />
  228. public override IBsonSerializer<TOutput> OutputSerializer => _outputSerializer;
  229. /// <summary>
  230. /// Gets the stages.
  231. /// </summary>
  232. public IList<BsonDocument> Documents
  233. {
  234. get { return _stages; }
  235. }
  236. /// <inheritdoc />
  237. public override IEnumerable<IPipelineStageDefinition> Stages => _stages.Select(s => new BsonDocumentPipelineStageDefinition<TInput, TOutput>(s, _outputSerializer));
  238. /// <inheritdoc />
  239. public override RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
  240. {
  241. return new RenderedPipelineDefinition<TOutput>(
  242. _stages,
  243. _outputSerializer ?? (inputSerializer as IBsonSerializer<TOutput>) ?? serializerRegistry.GetSerializer<TOutput>());
  244. }
  245. }
  246. /// <summary>
  247. /// A pipeline composed of instances of <see cref="IPipelineStageDefinition" />.
  248. /// </summary>
  249. /// <typeparam name="TInput">The type of the input.</typeparam>
  250. /// <typeparam name="TOutput">The type of the output.</typeparam>
  251. public sealed class PipelineStagePipelineDefinition<TInput, TOutput> : PipelineDefinition<TInput, TOutput>
  252. {
  253. private readonly IList<IPipelineStageDefinition> _stages;
  254. private readonly IBsonSerializer<TOutput> _outputSerializer;
  255. /// <summary>
  256. /// Initializes a new instance of the <see cref="PipelineStagePipelineDefinition{TInput, TOutput}"/> class.
  257. /// </summary>
  258. /// <param name="stages">The stages.</param>
  259. /// <param name="outputSerializer">The output serializer.</param>
  260. public PipelineStagePipelineDefinition(IEnumerable<IPipelineStageDefinition> stages, IBsonSerializer<TOutput> outputSerializer = null)
  261. {
  262. _stages = VerifyStages(Ensure.IsNotNull(stages, nameof(stages)).ToList());
  263. _outputSerializer = outputSerializer;
  264. }
  265. /// <inheritdoc />
  266. public override IBsonSerializer<TOutput> OutputSerializer => _outputSerializer;
  267. /// <summary>
  268. /// Gets the serializer.
  269. /// </summary>
  270. [Obsolete("Use OutputSerializer instead.")]
  271. public IBsonSerializer<TOutput> Serializer
  272. {
  273. get { return _outputSerializer; }
  274. }
  275. /// <summary>
  276. /// Gets the stages.
  277. /// </summary>
  278. public override IEnumerable<IPipelineStageDefinition> Stages => _stages;
  279. /// <inheritdoc />
  280. public override RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
  281. {
  282. var pipeline = new List<BsonDocument>();
  283. IBsonSerializer currentSerializer = inputSerializer;
  284. foreach (var stage in _stages)
  285. {
  286. var renderedStage = stage.Render(currentSerializer, serializerRegistry);
  287. currentSerializer = renderedStage.OutputSerializer;
  288. if (renderedStage.Document.ElementCount > 0)
  289. {
  290. pipeline.Add(renderedStage.Document);
  291. }
  292. }
  293. return new RenderedPipelineDefinition<TOutput>(
  294. pipeline,
  295. _outputSerializer ?? (currentSerializer as IBsonSerializer<TOutput>) ?? serializerRegistry.GetSerializer<TOutput>());
  296. }
  297. private static List<IPipelineStageDefinition> VerifyStages(List<IPipelineStageDefinition> stages)
  298. {
  299. var nextInputType = typeof(TInput);
  300. for (int i = 0; i < stages.Count; i++)
  301. {
  302. if (stages[i].InputType != nextInputType)
  303. {
  304. var message = string.Format(
  305. "The input type to stage[{0}] was expected to be {1}, but was {2}.",
  306. i,
  307. nextInputType,
  308. stages[i].InputType);
  309. throw new ArgumentException(message, "stages");
  310. }
  311. nextInputType = stages[i].OutputType;
  312. }
  313. if (nextInputType != typeof(TOutput))
  314. {
  315. var message = string.Format(
  316. "The output type to the last stage was expected to be {0}, but was {1}.",
  317. nextInputType,
  318. stages.Last().OutputType);
  319. throw new ArgumentException(message, "stages");
  320. }
  321. return stages;
  322. }
  323. }
  324. internal class OptimizingPipelineDefinition<TInput, TOutput> : PipelineDefinition<TInput, TOutput>
  325. {
  326. private readonly PipelineDefinition<TInput, TOutput> _wrapped;
  327. public OptimizingPipelineDefinition(PipelineDefinition<TInput, TOutput> wrapped)
  328. {
  329. _wrapped = wrapped;
  330. }
  331. /// <inheritdoc />
  332. public override IBsonSerializer<TOutput> OutputSerializer => _wrapped.OutputSerializer;
  333. /// <inheritdoc />
  334. public override IEnumerable<IPipelineStageDefinition> Stages => _wrapped.Stages;
  335. public override RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
  336. {
  337. var rendered = _wrapped.Render(inputSerializer, serializerRegistry);
  338. // do some combining of $match documents if possible. This is optimized for the
  339. // OfType case where we've added a discriminator as a match at the beginning of the pipeline.
  340. if (rendered.Documents.Count > 1)
  341. {
  342. var firstStage = rendered.Documents[0].GetElement(0);
  343. var secondStage = rendered.Documents[1].GetElement(0);
  344. if (firstStage.Name == "$match" && secondStage.Name == "$match")
  345. {
  346. var combinedFilter = Builders<BsonDocument>.Filter.And(
  347. (BsonDocument)firstStage.Value,
  348. (BsonDocument)secondStage.Value);
  349. var combinedStage = new BsonDocument("$match", combinedFilter.Render(BsonDocumentSerializer.Instance, serializerRegistry));
  350. rendered.Documents[0] = combinedStage;
  351. rendered.Documents.RemoveAt(1);
  352. }
  353. }
  354. return rendered;
  355. }
  356. }
  357. }