MongoCollectionBase.cs 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850
  1. /* Copyright 2010-present MongoDB Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. using MongoDB.Bson;
  21. using MongoDB.Bson.Serialization;
  22. using MongoDB.Driver.Core.Misc;
  23. using MongoDB.Driver.Linq;
  24. namespace MongoDB.Driver
  25. {
  26. /// <summary>
  27. /// Base class for implementors of <see cref="IMongoCollection{TDocument}"/>.
  28. /// </summary>
  29. /// <typeparam name="TDocument">The type of the document.</typeparam>
  30. public abstract class MongoCollectionBase<TDocument> : IMongoCollection<TDocument>
  31. {
  32. /// <inheritdoc />
  33. public abstract CollectionNamespace CollectionNamespace { get; }
  34. /// <inheritdoc />
  35. public abstract IMongoDatabase Database { get; }
  36. /// <inheritdoc />
  37. public abstract IBsonSerializer<TDocument> DocumentSerializer { get; }
  38. /// <inheritdoc />
  39. public abstract IMongoIndexManager<TDocument> Indexes { get; }
  40. /// <inheritdoc />
  41. public abstract MongoCollectionSettings Settings { get; }
  42. /// <inheritdoc />
  43. public virtual IAsyncCursor<TResult> Aggregate<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  44. {
  45. throw new NotImplementedException();
  46. }
  47. /// <inheritdoc />
  48. public virtual IAsyncCursor<TResult> Aggregate<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  49. {
  50. throw new NotImplementedException();
  51. }
  52. /// <inheritdoc />
  53. public abstract Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
  54. /// <inheritdoc />
  55. public virtual Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  56. {
  57. throw new NotImplementedException();
  58. }
  59. /// <inheritdoc />
  60. public virtual BulkWriteResult<TDocument> BulkWrite(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  61. {
  62. throw new NotImplementedException();
  63. }
  64. /// <inheritdoc />
  65. public virtual BulkWriteResult<TDocument> BulkWrite(IClientSessionHandle session, IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  66. {
  67. throw new NotImplementedException();
  68. }
  69. /// <inheritdoc />
  70. public abstract Task<BulkWriteResult<TDocument>> BulkWriteAsync(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
  71. /// <inheritdoc />
  72. public virtual Task<BulkWriteResult<TDocument>> BulkWriteAsync(IClientSessionHandle session, IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  73. {
  74. throw new NotImplementedException();
  75. }
  76. /// <inheritdoc />
  77. [Obsolete("Use CountDocuments or EstimatedDocumentCount instead.")]
  78. public virtual long Count(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  79. {
  80. throw new NotImplementedException();
  81. }
  82. /// <inheritdoc />
  83. [Obsolete("Use CountDocuments or EstimatedDocumentCount instead.")]
  84. public virtual long Count(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  85. {
  86. throw new NotImplementedException();
  87. }
  88. /// <inheritdoc />
  89. [Obsolete("Use CountDocumentsAsync or EstimatedDocumentCountAsync instead.")]
  90. public abstract Task<long> CountAsync(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
  91. /// <inheritdoc />
  92. [Obsolete("Use CountDocumentsAsync or EstimatedDocumentCountAsync instead.")]
  93. public virtual Task<long> CountAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  94. {
  95. throw new NotImplementedException();
  96. }
  97. /// <inheritdoc />
  98. public virtual long CountDocuments(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  99. {
  100. throw new NotImplementedException();
  101. }
  102. /// <inheritdoc />
  103. public virtual long CountDocuments(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  104. {
  105. throw new NotImplementedException();
  106. }
  107. /// <inheritdoc />
  108. public virtual Task<long> CountDocumentsAsync(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  109. {
  110. throw new NotImplementedException();
  111. }
  112. /// <inheritdoc />
  113. public virtual Task<long> CountDocumentsAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  114. {
  115. throw new NotImplementedException();
  116. }
  117. /// <inheritdoc />
  118. public virtual DeleteResult DeleteMany(FilterDefinition<TDocument> filter, CancellationToken cancellationToken = default(CancellationToken))
  119. {
  120. return DeleteMany(filter, null, cancellationToken);
  121. }
  122. /// <inheritdoc />
  123. public virtual DeleteResult DeleteMany(FilterDefinition<TDocument> filter, DeleteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  124. {
  125. return DeleteMany(filter, options, requests => BulkWrite(requests, null, cancellationToken));
  126. }
  127. /// <inheritdoc />
  128. public virtual DeleteResult DeleteMany(IClientSessionHandle session, FilterDefinition<TDocument> filter, DeleteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  129. {
  130. return DeleteMany(filter, options, requests => BulkWrite(session, requests, null, cancellationToken));
  131. }
  132. private DeleteResult DeleteMany(FilterDefinition<TDocument> filter, DeleteOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteResult> bulkWriteFunc)
  133. {
  134. Ensure.IsNotNull(filter, nameof(filter));
  135. options = options ?? new DeleteOptions();
  136. var model = new DeleteManyModel<TDocument>(filter)
  137. {
  138. Collation = options.Collation
  139. };
  140. try
  141. {
  142. var result = bulkWriteFunc(new[] { model });
  143. return DeleteResult.FromCore(result);
  144. }
  145. catch (MongoBulkWriteException<TDocument> ex)
  146. {
  147. throw MongoWriteException.FromBulkWriteException(ex);
  148. }
  149. }
  150. /// <inheritdoc />
  151. public virtual Task<DeleteResult> DeleteManyAsync(FilterDefinition<TDocument> filter, CancellationToken cancellationToken = default(CancellationToken))
  152. {
  153. return DeleteManyAsync(filter, null, cancellationToken);
  154. }
  155. /// <inheritdoc />
  156. public virtual Task<DeleteResult> DeleteManyAsync(FilterDefinition<TDocument> filter, DeleteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  157. {
  158. return DeleteManyAsync(filter, options, requests => BulkWriteAsync(requests, null, cancellationToken));
  159. }
  160. /// <inheritdoc />
  161. public virtual Task<DeleteResult> DeleteManyAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, DeleteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  162. {
  163. return DeleteManyAsync(filter, options, requests => BulkWriteAsync(session, requests, null, cancellationToken));
  164. }
  165. private async Task<DeleteResult> DeleteManyAsync(FilterDefinition<TDocument> filter, DeleteOptions options, Func<IEnumerable<WriteModel<TDocument>>, Task<BulkWriteResult<TDocument>>> bulkWriteFuncAsync)
  166. {
  167. Ensure.IsNotNull(filter, nameof(filter));
  168. options = options ?? new DeleteOptions();
  169. var model = new DeleteManyModel<TDocument>(filter)
  170. {
  171. Collation = options.Collation
  172. };
  173. try
  174. {
  175. var result = await bulkWriteFuncAsync(new[] { model }).ConfigureAwait(false);
  176. return DeleteResult.FromCore(result);
  177. }
  178. catch (MongoBulkWriteException<TDocument> ex)
  179. {
  180. throw MongoWriteException.FromBulkWriteException(ex);
  181. }
  182. }
  183. /// <inheritdoc />
  184. public virtual DeleteResult DeleteOne(FilterDefinition<TDocument> filter, CancellationToken cancellationToken = default(CancellationToken))
  185. {
  186. return DeleteOne(filter, null, cancellationToken);
  187. }
  188. /// <inheritdoc />
  189. public virtual DeleteResult DeleteOne(FilterDefinition<TDocument> filter, DeleteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  190. {
  191. return DeleteOne(filter, options, requests => BulkWrite(requests, null, cancellationToken));
  192. }
  193. /// <inheritdoc />
  194. public virtual DeleteResult DeleteOne(IClientSessionHandle session, FilterDefinition<TDocument> filter, DeleteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  195. {
  196. return DeleteOne(filter, options, requests => BulkWrite(session, requests, null, cancellationToken));
  197. }
  198. private DeleteResult DeleteOne(FilterDefinition<TDocument> filter, DeleteOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteResult> bulkWrite)
  199. {
  200. Ensure.IsNotNull(filter, nameof(filter));
  201. options = options ?? new DeleteOptions();
  202. var model = new DeleteOneModel<TDocument>(filter)
  203. {
  204. Collation = options.Collation
  205. };
  206. try
  207. {
  208. var result = bulkWrite(new[] { model });
  209. return DeleteResult.FromCore(result);
  210. }
  211. catch (MongoBulkWriteException<TDocument> ex)
  212. {
  213. throw MongoWriteException.FromBulkWriteException(ex);
  214. }
  215. }
  216. /// <inheritdoc />
  217. public virtual Task<DeleteResult> DeleteOneAsync(FilterDefinition<TDocument> filter, CancellationToken cancellationToken = default(CancellationToken))
  218. {
  219. return DeleteOneAsync(filter, null, cancellationToken);
  220. }
  221. /// <inheritdoc />
  222. public virtual Task<DeleteResult> DeleteOneAsync(FilterDefinition<TDocument> filter, DeleteOptions options, CancellationToken cancellationToken = default(CancellationToken))
  223. {
  224. return DeleteOneAsync(filter, options, requests => BulkWriteAsync(requests, null, cancellationToken));
  225. }
  226. /// <inheritdoc />
  227. public virtual Task<DeleteResult> DeleteOneAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, DeleteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  228. {
  229. return DeleteOneAsync(filter, options, requests => BulkWriteAsync(session, requests, null, cancellationToken));
  230. }
  231. private async Task<DeleteResult> DeleteOneAsync(FilterDefinition<TDocument> filter, DeleteOptions options, Func<IEnumerable<WriteModel<TDocument>>, Task<BulkWriteResult<TDocument>>> bulkWriteAsync)
  232. {
  233. Ensure.IsNotNull(filter, nameof(filter));
  234. options = options ?? new DeleteOptions();
  235. var model = new DeleteOneModel<TDocument>(filter)
  236. {
  237. Collation = options.Collation
  238. };
  239. try
  240. {
  241. var result = await bulkWriteAsync(new[] { model }).ConfigureAwait(false);
  242. return DeleteResult.FromCore(result);
  243. }
  244. catch (MongoBulkWriteException<TDocument> ex)
  245. {
  246. throw MongoWriteException.FromBulkWriteException(ex);
  247. }
  248. }
  249. /// <inheritdoc />
  250. public virtual IAsyncCursor<TField> Distinct<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  251. {
  252. throw new NotImplementedException();
  253. }
  254. /// <inheritdoc />
  255. public virtual IAsyncCursor<TField> Distinct<TField>(IClientSessionHandle session, FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  256. {
  257. throw new NotImplementedException();
  258. }
  259. /// <inheritdoc />
  260. public abstract Task<IAsyncCursor<TField>> DistinctAsync<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
  261. /// <inheritdoc />
  262. public virtual Task<IAsyncCursor<TField>> DistinctAsync<TField>(IClientSessionHandle session, FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  263. {
  264. throw new NotImplementedException();
  265. }
  266. /// <inheritdoc />
  267. public virtual long EstimatedDocumentCount(EstimatedDocumentCountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  268. {
  269. throw new NotImplementedException();
  270. }
  271. /// <inheritdoc />
  272. public virtual Task<long> EstimatedDocumentCountAsync(EstimatedDocumentCountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  273. {
  274. throw new NotImplementedException();
  275. }
  276. /// <inheritdoc />
  277. public virtual IAsyncCursor<TProjection> FindSync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  278. {
  279. throw new NotImplementedException();
  280. }
  281. /// <inheritdoc />
  282. public virtual IAsyncCursor<TProjection> FindSync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  283. {
  284. throw new NotImplementedException();
  285. }
  286. /// <inheritdoc />
  287. public abstract Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken));
  288. /// <inheritdoc />
  289. public virtual Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  290. {
  291. throw new NotImplementedException();
  292. }
  293. /// <inheritdoc />
  294. public virtual TProjection FindOneAndDelete<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  295. {
  296. throw new NotImplementedException();
  297. }
  298. /// <inheritdoc />
  299. public virtual TProjection FindOneAndDelete<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  300. {
  301. throw new NotImplementedException();
  302. }
  303. /// <inheritdoc />
  304. public abstract Task<TProjection> FindOneAndDeleteAsync<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken));
  305. /// <inheritdoc />
  306. public virtual Task<TProjection> FindOneAndDeleteAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  307. {
  308. throw new NotImplementedException();
  309. }
  310. /// <inheritdoc />
  311. public virtual TProjection FindOneAndReplace<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  312. {
  313. throw new NotImplementedException();
  314. }
  315. /// <inheritdoc />
  316. public virtual TProjection FindOneAndReplace<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  317. {
  318. throw new NotImplementedException();
  319. }
  320. /// <inheritdoc />
  321. public abstract Task<TProjection> FindOneAndReplaceAsync<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken));
  322. /// <inheritdoc />
  323. public virtual Task<TProjection> FindOneAndReplaceAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  324. {
  325. throw new NotImplementedException();
  326. }
  327. /// <inheritdoc />
  328. public virtual TProjection FindOneAndUpdate<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  329. {
  330. throw new NotImplementedException();
  331. }
  332. /// <inheritdoc />
  333. public virtual TProjection FindOneAndUpdate<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  334. {
  335. throw new NotImplementedException();
  336. }
  337. /// <inheritdoc />
  338. public abstract Task<TProjection> FindOneAndUpdateAsync<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken));
  339. /// <inheritdoc />
  340. public virtual Task<TProjection> FindOneAndUpdateAsync<TProjection>(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
  341. {
  342. throw new NotImplementedException();
  343. }
  344. /// <inheritdoc />
  345. public virtual void InsertOne(TDocument document, InsertOneOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  346. {
  347. InsertOne(document, options, (requests, bulkWriteOptions) => BulkWrite(requests, bulkWriteOptions, cancellationToken));
  348. }
  349. /// <inheritdoc />
  350. public virtual void InsertOne(IClientSessionHandle session, TDocument document, InsertOneOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  351. {
  352. InsertOne(document, options, (requests, bulkWriteOptions) => BulkWrite(session, requests, bulkWriteOptions, cancellationToken));
  353. }
  354. private void InsertOne(TDocument document, InsertOneOptions options, Action<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions> bulkWrite)
  355. {
  356. Ensure.IsNotNull((object)document, "document");
  357. var model = new InsertOneModel<TDocument>(document);
  358. try
  359. {
  360. var bulkWriteOptions = options == null ? null : new BulkWriteOptions
  361. {
  362. BypassDocumentValidation = options.BypassDocumentValidation
  363. };
  364. bulkWrite(new[] { model }, bulkWriteOptions);
  365. }
  366. catch (MongoBulkWriteException<TDocument> ex)
  367. {
  368. throw MongoWriteException.FromBulkWriteException(ex);
  369. }
  370. }
  371. /// <inheritdoc />
  372. [Obsolete("Use the new overload of InsertOneAsync with an InsertOneOptions parameter instead.")]
  373. public virtual Task InsertOneAsync(TDocument document, CancellationToken _cancellationToken)
  374. {
  375. return InsertOneAsync(document, null, _cancellationToken);
  376. }
  377. /// <inheritdoc />
  378. public virtual Task InsertOneAsync(TDocument document, InsertOneOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  379. {
  380. return InsertOneAsync(document, options, (requests, bulkWriteOptions) => BulkWriteAsync(requests, bulkWriteOptions, cancellationToken));
  381. }
  382. /// <inheritdoc />
  383. public virtual Task InsertOneAsync(IClientSessionHandle session, TDocument document, InsertOneOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  384. {
  385. return InsertOneAsync(document, options, (requests, bulkWriteOptions) => BulkWriteAsync(session, requests, bulkWriteOptions, cancellationToken));
  386. }
  387. private async Task InsertOneAsync(TDocument document, InsertOneOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, Task> bulkWriteAsync)
  388. {
  389. Ensure.IsNotNull((object)document, "document");
  390. var model = new InsertOneModel<TDocument>(document);
  391. try
  392. {
  393. var bulkWriteOptions = options == null ? null : new BulkWriteOptions
  394. {
  395. BypassDocumentValidation = options.BypassDocumentValidation
  396. };
  397. await bulkWriteAsync(new[] { model }, bulkWriteOptions).ConfigureAwait(false);
  398. }
  399. catch (MongoBulkWriteException<TDocument> ex)
  400. {
  401. throw MongoWriteException.FromBulkWriteException(ex);
  402. }
  403. }
  404. /// <inheritdoc />
  405. public virtual void InsertMany(IEnumerable<TDocument> documents, InsertManyOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  406. {
  407. InsertMany(documents, options, (requests, bulkWriteOptions) => BulkWrite(requests, bulkWriteOptions, cancellationToken));
  408. }
  409. /// <inheritdoc />
  410. public virtual void InsertMany(IClientSessionHandle session, IEnumerable<TDocument> documents, InsertManyOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  411. {
  412. InsertMany(documents, options, (requests, bulkWriteOptions) => BulkWrite(session, requests, bulkWriteOptions, cancellationToken));
  413. }
  414. private void InsertMany(IEnumerable<TDocument> documents, InsertManyOptions options, Action<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions> bulkWrite)
  415. {
  416. Ensure.IsNotNull(documents, nameof(documents));
  417. var models = documents.Select(x => new InsertOneModel<TDocument>(x));
  418. BulkWriteOptions bulkWriteOptions = options == null ? null : new BulkWriteOptions
  419. {
  420. BypassDocumentValidation = options.BypassDocumentValidation,
  421. IsOrdered = options.IsOrdered
  422. };
  423. bulkWrite(models, bulkWriteOptions);
  424. }
  425. /// <inheritdoc />
  426. public virtual Task InsertManyAsync(IEnumerable<TDocument> documents, InsertManyOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  427. {
  428. return InsertManyAsync(documents, options, (requests, bulkWriteOptions) => BulkWriteAsync(requests, bulkWriteOptions, cancellationToken));
  429. }
  430. /// <inheritdoc />
  431. public virtual Task InsertManyAsync(IClientSessionHandle session, IEnumerable<TDocument> documents, InsertManyOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  432. {
  433. return InsertManyAsync(documents, options, (requests, bulkWriteOptions) => BulkWriteAsync(session, requests, bulkWriteOptions, cancellationToken));
  434. }
  435. private Task InsertManyAsync(IEnumerable<TDocument> documents, InsertManyOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, Task> bulkWriteAsync)
  436. {
  437. Ensure.IsNotNull(documents, nameof(documents));
  438. var models = documents.Select(x => new InsertOneModel<TDocument>(x));
  439. var bulkWriteOptions = options == null ? null : new BulkWriteOptions
  440. {
  441. BypassDocumentValidation = options.BypassDocumentValidation,
  442. IsOrdered = options.IsOrdered
  443. };
  444. return bulkWriteAsync(models, bulkWriteOptions);
  445. }
  446. /// <inheritdoc />
  447. public virtual IAsyncCursor<TResult> MapReduce<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  448. {
  449. throw new NotImplementedException();
  450. }
  451. /// <inheritdoc />
  452. public virtual IAsyncCursor<TResult> MapReduce<TResult>(IClientSessionHandle session, BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  453. {
  454. throw new NotImplementedException();
  455. }
  456. /// <inheritdoc />
  457. public abstract Task<IAsyncCursor<TResult>> MapReduceAsync<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken));
  458. /// <inheritdoc />
  459. public virtual Task<IAsyncCursor<TResult>> MapReduceAsync<TResult>(IClientSessionHandle session, BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
  460. {
  461. throw new NotImplementedException();
  462. }
  463. /// <inheritdoc />
  464. public abstract IFilteredMongoCollection<TDerivedDocument> OfType<TDerivedDocument>() where TDerivedDocument : TDocument;
  465. /// <inheritdoc />
  466. public virtual ReplaceOneResult ReplaceOne(FilterDefinition<TDocument> filter, TDocument replacement, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  467. {
  468. return ReplaceOne(filter, replacement, options, (requests, bulkWriteOptions) => BulkWrite(requests, bulkWriteOptions, cancellationToken));
  469. }
  470. /// <inheritdoc />
  471. public virtual ReplaceOneResult ReplaceOne(IClientSessionHandle session, FilterDefinition<TDocument> filter, TDocument replacement, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  472. {
  473. return ReplaceOne(filter, replacement, options, (requests, bulkWriteOptions) => BulkWrite(session, requests, bulkWriteOptions, cancellationToken));
  474. }
  475. private ReplaceOneResult ReplaceOne(FilterDefinition<TDocument> filter, TDocument replacement, UpdateOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, BulkWriteResult<TDocument>> bulkWrite)
  476. {
  477. Ensure.IsNotNull(filter, nameof(filter));
  478. Ensure.IsNotNull((object)replacement, "replacement");
  479. if (options?.ArrayFilters != null)
  480. {
  481. throw new ArgumentException("ArrayFilters cannot be used with ReplaceOne.", nameof(options));
  482. }
  483. options = options ?? new UpdateOptions();
  484. var model = new ReplaceOneModel<TDocument>(filter, replacement)
  485. {
  486. Collation = options.Collation,
  487. IsUpsert = options.IsUpsert
  488. };
  489. try
  490. {
  491. var bulkWriteOptions = new BulkWriteOptions
  492. {
  493. BypassDocumentValidation = options.BypassDocumentValidation
  494. };
  495. var result = bulkWrite(new[] { model }, bulkWriteOptions);
  496. return ReplaceOneResult.FromCore(result);
  497. }
  498. catch (MongoBulkWriteException<TDocument> ex)
  499. {
  500. throw MongoWriteException.FromBulkWriteException(ex);
  501. }
  502. }
  503. /// <inheritdoc />
  504. public virtual Task<ReplaceOneResult> ReplaceOneAsync(FilterDefinition<TDocument> filter, TDocument replacement, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  505. {
  506. return ReplaceOneAsync(filter, replacement, options, (requests, bulkWriteOptions) => BulkWriteAsync(requests, bulkWriteOptions, cancellationToken));
  507. }
  508. /// <inheritdoc />
  509. public virtual Task<ReplaceOneResult> ReplaceOneAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, TDocument replacement, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  510. {
  511. return ReplaceOneAsync(filter, replacement, options, (requests, bulkWriteOptions) => BulkWriteAsync(session, requests, bulkWriteOptions, cancellationToken));
  512. }
  513. private async Task<ReplaceOneResult> ReplaceOneAsync(FilterDefinition<TDocument> filter, TDocument replacement, UpdateOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, Task<BulkWriteResult<TDocument>>> bulkWriteAsync)
  514. {
  515. Ensure.IsNotNull(filter, nameof(filter));
  516. Ensure.IsNotNull((object)replacement, "replacement");
  517. if (options?.ArrayFilters != null)
  518. {
  519. throw new ArgumentException("ArrayFilters cannot be used with ReplaceOne.", nameof(options));
  520. }
  521. options = options ?? new UpdateOptions();
  522. var model = new ReplaceOneModel<TDocument>(filter, replacement)
  523. {
  524. Collation = options.Collation,
  525. IsUpsert = options.IsUpsert
  526. };
  527. try
  528. {
  529. var bulkWriteOptions = new BulkWriteOptions
  530. {
  531. BypassDocumentValidation = options.BypassDocumentValidation
  532. };
  533. var result = await bulkWriteAsync(new[] { model }, bulkWriteOptions).ConfigureAwait(false);
  534. return ReplaceOneResult.FromCore(result);
  535. }
  536. catch (MongoBulkWriteException<TDocument> ex)
  537. {
  538. throw MongoWriteException.FromBulkWriteException(ex);
  539. }
  540. }
  541. /// <inheritdoc />
  542. public virtual UpdateResult UpdateMany(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  543. {
  544. return UpdateMany(filter, update, options, (requests, bulkWriteOptions) => BulkWrite(requests, bulkWriteOptions, cancellationToken));
  545. }
  546. /// <inheritdoc />
  547. public virtual UpdateResult UpdateMany(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  548. {
  549. return UpdateMany(filter, update, options, (requests, bulkWriteOptions) => BulkWrite(session, requests, bulkWriteOptions, cancellationToken));
  550. }
  551. private UpdateResult UpdateMany(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, BulkWriteResult<TDocument>> bulkWrite)
  552. {
  553. Ensure.IsNotNull(filter, nameof(filter));
  554. Ensure.IsNotNull(update, nameof(update));
  555. options = options ?? new UpdateOptions();
  556. var model = new UpdateManyModel<TDocument>(filter, update)
  557. {
  558. ArrayFilters = options.ArrayFilters,
  559. Collation = options.Collation,
  560. IsUpsert = options.IsUpsert
  561. };
  562. try
  563. {
  564. var bulkWriteOptions = new BulkWriteOptions
  565. {
  566. BypassDocumentValidation = options.BypassDocumentValidation
  567. };
  568. var result = bulkWrite(new[] { model }, bulkWriteOptions);
  569. return UpdateResult.FromCore(result);
  570. }
  571. catch (MongoBulkWriteException<TDocument> ex)
  572. {
  573. throw MongoWriteException.FromBulkWriteException(ex);
  574. }
  575. }
  576. /// <inheritdoc />
  577. public virtual Task<UpdateResult> UpdateManyAsync(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  578. {
  579. return UpdateManyAsync(filter, update, options, (requests, bulkWriteOptions) => BulkWriteAsync(requests, bulkWriteOptions, cancellationToken));
  580. }
  581. /// <inheritdoc />
  582. public virtual Task<UpdateResult> UpdateManyAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  583. {
  584. return UpdateManyAsync(filter, update, options, (requests, bulkWriteOptions) => BulkWriteAsync(session, requests, bulkWriteOptions, cancellationToken));
  585. }
  586. private async Task<UpdateResult> UpdateManyAsync(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, Task<BulkWriteResult<TDocument>>> bulkWriteAsync)
  587. {
  588. Ensure.IsNotNull(filter, nameof(filter));
  589. Ensure.IsNotNull(update, nameof(update));
  590. options = options ?? new UpdateOptions();
  591. var model = new UpdateManyModel<TDocument>(filter, update)
  592. {
  593. ArrayFilters = options.ArrayFilters,
  594. Collation = options.Collation,
  595. IsUpsert = options.IsUpsert
  596. };
  597. try
  598. {
  599. var bulkWriteOptions = new BulkWriteOptions
  600. {
  601. BypassDocumentValidation = options.BypassDocumentValidation
  602. };
  603. var result = await bulkWriteAsync(new[] { model }, bulkWriteOptions).ConfigureAwait(false);
  604. return UpdateResult.FromCore(result);
  605. }
  606. catch (MongoBulkWriteException<TDocument> ex)
  607. {
  608. throw MongoWriteException.FromBulkWriteException(ex);
  609. }
  610. }
  611. /// <inheritdoc />
  612. public virtual UpdateResult UpdateOne(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  613. {
  614. return UpdateOne(filter, update, options, (requests, bulkWriteOptions) => BulkWrite(requests, bulkWriteOptions, cancellationToken));
  615. }
  616. /// <inheritdoc />
  617. public virtual UpdateResult UpdateOne(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  618. {
  619. return UpdateOne(filter, update, options, (requests, bulkWriteOptions) => BulkWrite(session, requests, bulkWriteOptions, cancellationToken));
  620. }
  621. private UpdateResult UpdateOne(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, BulkWriteResult<TDocument>> bulkWrite)
  622. {
  623. Ensure.IsNotNull(filter, nameof(filter));
  624. Ensure.IsNotNull(update, nameof(update));
  625. options = options ?? new UpdateOptions();
  626. var model = new UpdateOneModel<TDocument>(filter, update)
  627. {
  628. ArrayFilters = options.ArrayFilters,
  629. Collation = options.Collation,
  630. IsUpsert = options.IsUpsert
  631. };
  632. try
  633. {
  634. var bulkWriteOptions = new BulkWriteOptions
  635. {
  636. BypassDocumentValidation = options.BypassDocumentValidation
  637. };
  638. var result = bulkWrite(new[] { model }, bulkWriteOptions);
  639. return UpdateResult.FromCore(result);
  640. }
  641. catch (MongoBulkWriteException<TDocument> ex)
  642. {
  643. throw MongoWriteException.FromBulkWriteException(ex);
  644. }
  645. }
  646. /// <inheritdoc />
  647. public virtual Task<UpdateResult> UpdateOneAsync(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  648. {
  649. return UpdateOneAsync(filter, update, options, (requests, bulkWriteOptions) => BulkWriteAsync(requests, bulkWriteOptions, cancellationToken));
  650. }
  651. /// <inheritdoc />
  652. public virtual Task<UpdateResult> UpdateOneAsync(IClientSessionHandle session, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
  653. {
  654. return UpdateOneAsync(filter, update, options, (requests, bulkWriteOptions) => BulkWriteAsync(session, requests, bulkWriteOptions, cancellationToken));
  655. }
  656. private async Task<UpdateResult> UpdateOneAsync(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, UpdateOptions options, Func<IEnumerable<WriteModel<TDocument>>, BulkWriteOptions, Task<BulkWriteResult<TDocument>>> bulkWriteAsync)
  657. {
  658. Ensure.IsNotNull(filter, nameof(filter));
  659. Ensure.IsNotNull(update, nameof(update));
  660. options = options ?? new UpdateOptions();
  661. var model = new UpdateOneModel<TDocument>(filter, update)
  662. {
  663. ArrayFilters = options.ArrayFilters,
  664. Collation = options.Collation,
  665. IsUpsert = options.IsUpsert
  666. };
  667. try
  668. {
  669. var bulkWriteOptions = new BulkWriteOptions
  670. {
  671. BypassDocumentValidation = options.BypassDocumentValidation
  672. };
  673. var result = await bulkWriteAsync(new[] { model }, bulkWriteOptions).ConfigureAwait(false);
  674. return UpdateResult.FromCore(result);
  675. }
  676. catch (MongoBulkWriteException<TDocument> ex)
  677. {
  678. throw MongoWriteException.FromBulkWriteException(ex);
  679. }
  680. }
  681. /// <inheritdoc />
  682. public virtual IAsyncCursor<TResult> Watch<TResult>(
  683. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  684. ChangeStreamOptions options = null,
  685. CancellationToken cancellationToken = default(CancellationToken))
  686. {
  687. throw new NotImplementedException(); // implemented by subclasses
  688. }
  689. /// <inheritdoc />
  690. public virtual IAsyncCursor<TResult> Watch<TResult>(
  691. IClientSessionHandle session,
  692. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  693. ChangeStreamOptions options = null,
  694. CancellationToken cancellationToken = default(CancellationToken))
  695. {
  696. throw new NotImplementedException(); // implemented by subclasses
  697. }
  698. /// <inheritdoc />
  699. public virtual Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  700. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  701. ChangeStreamOptions options = null,
  702. CancellationToken cancellationToken = default(CancellationToken))
  703. {
  704. throw new NotImplementedException(); // implemented by subclasses
  705. }
  706. /// <inheritdoc />
  707. public virtual Task<IAsyncCursor<TResult>> WatchAsync<TResult>(
  708. IClientSessionHandle session,
  709. PipelineDefinition<ChangeStreamDocument<TDocument>, TResult> pipeline,
  710. ChangeStreamOptions options = null,
  711. CancellationToken cancellationToken = default(CancellationToken))
  712. {
  713. throw new NotImplementedException(); // implemented by subclasses
  714. }
  715. /// <inheritdoc />
  716. public virtual IMongoCollection<TDocument> WithReadConcern(ReadConcern readConcern)
  717. {
  718. throw new NotImplementedException();
  719. }
  720. /// <inheritdoc />
  721. public abstract IMongoCollection<TDocument> WithReadPreference(ReadPreference readPreference);
  722. /// <inheritdoc />
  723. public abstract IMongoCollection<TDocument> WithWriteConcern(WriteConcern writeConcern);
  724. }
  725. }