ChangeStreamHelper.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. /* Copyright 2018-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using MongoDB.Bson;
  16. using MongoDB.Bson.Serialization;
  17. using MongoDB.Bson.Serialization.Serializers;
  18. using MongoDB.Driver.Core.Operations;
  19. using MongoDB.Driver.Core.WireProtocol.Messages.Encoders;
  20. namespace MongoDB.Driver
  21. {
  22. internal static class ChangeStreamHelper
  23. {
  24. // public static methods
  25. public static ChangeStreamOperation<TResult> CreateChangeStreamOperation<TResult>(
  26. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  27. ChangeStreamOptions options,
  28. ReadConcern readConcern,
  29. MessageEncoderSettings messageEncoderSettings)
  30. {
  31. var renderedPipeline = RenderPipeline(pipeline, BsonDocumentSerializer.Instance);
  32. var operation = new ChangeStreamOperation<TResult>(
  33. renderedPipeline.Documents,
  34. renderedPipeline.OutputSerializer,
  35. messageEncoderSettings);
  36. SetOperationOptions(operation, options, readConcern);
  37. return operation;
  38. }
  39. public static ChangeStreamOperation<TResult> CreateChangeStreamOperation<TResult>(
  40. IMongoDatabase database,
  41. PipelineDefinition<ChangeStreamDocument<BsonDocument>, TResult> pipeline,
  42. ChangeStreamOptions options,
  43. ReadConcern readConcern,
  44. MessageEncoderSettings messageEncoderSettings)
  45. {
  46. var renderedPipeline = RenderPipeline(pipeline, BsonDocumentSerializer.Instance);
  47. var operation = new ChangeStreamOperation<TResult>(
  48. database.DatabaseNamespace,
  49. renderedPipeline.Documents,
  50. renderedPipeline.OutputSerializer,
  51. messageEncoderSettings);
  52. SetOperationOptions(operation, options, readConcern);
  53. return operation;
  54. }
  55. public static ChangeStreamOperation<TResult> CreateChangeStreamOperation<TResult, TDocument>(
  56. IMongoCollection<TDocument> collection,
  57. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  58. IBsonSerializer<TDocument> documentSerializer,
  59. ChangeStreamOptions options,
  60. ReadConcern readConcern,
  61. MessageEncoderSettings messageEncoderSettings)
  62. {
  63. var renderedPipeline = RenderPipeline(pipeline, documentSerializer);
  64. var operation = new ChangeStreamOperation<TResult>(
  65. collection.CollectionNamespace,
  66. renderedPipeline.Documents,
  67. renderedPipeline.OutputSerializer,
  68. messageEncoderSettings);
  69. SetOperationOptions(operation, options, readConcern);
  70. return operation;
  71. }
  72. // private static methods
  73. private static RenderedPipelineDefinition<TResult> RenderPipeline<TResult, TDocument>(
  74. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  75. IBsonSerializer<TDocument> documentSerializer)
  76. {
  77. var changeStreamDocumentSerializer = new ChangeStreamDocumentSerializer<TDocument>(documentSerializer);
  78. var serializerRegistry = BsonSerializer.SerializerRegistry;
  79. return pipeline.Render(changeStreamDocumentSerializer, serializerRegistry);
  80. }
  81. private static void SetOperationOptions<TResult>(
  82. ChangeStreamOperation<TResult> operation,
  83. ChangeStreamOptions options,
  84. ReadConcern readConcern)
  85. {
  86. options = options ?? new ChangeStreamOptions();
  87. operation.BatchSize = options.BatchSize;
  88. operation.Collation = options.Collation;
  89. operation.FullDocument = options.FullDocument;
  90. operation.MaxAwaitTime = options.MaxAwaitTime;
  91. operation.ReadConcern = readConcern;
  92. operation.ResumeAfter = options.ResumeAfter;
  93. operation.StartAtOperationTime = options.StartAtOperationTime;
  94. }
  95. }
  96. }