DBProxyComponentSystem.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq.Expressions;
  4. using System.Threading;
  5. using ETModel;
  6. using MongoDB.Bson;
  7. using MongoDB.Bson.Serialization;
  8. using MongoDB.Driver;
  9. namespace ETHotfix
  10. {
  11. [ObjectSystem]
  12. public class DbProxyComponentSystem : AwakeSystem<DBProxyComponent>
  13. {
  14. public override void Awake(DBProxyComponent self)
  15. {
  16. self.Awake();
  17. }
  18. }
  19. /// <summary>
  20. /// 用来与数据库操作代理
  21. /// </summary>
  22. public static class DBProxyComponentEx
  23. {
  24. public static void Awake(this DBProxyComponent self)
  25. {
  26. StartConfig dbStartConfig = StartConfigComponent.Instance.DBConfig;
  27. self.dbAddress = dbStartConfig.GetComponent<InnerConfig>().IPEndPoint;
  28. }
  29. public static async ETTask Save(this DBProxyComponent self, ComponentWithId component)
  30. {
  31. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(self.dbAddress);
  32. await session.Call(new DBSaveRequest { Component = component });
  33. }
  34. public static async ETTask SaveBatch(this DBProxyComponent self, List<ComponentWithId> components)
  35. {
  36. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(self.dbAddress);
  37. await session.Call(new DBSaveBatchRequest { Components = components });
  38. }
  39. public static async ETTask Save(this DBProxyComponent self, ComponentWithId component, CancellationToken cancellationToken)
  40. {
  41. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(self.dbAddress);
  42. await session.Call(new DBSaveRequest { Component = component }, cancellationToken);
  43. }
  44. public static async ETVoid SaveLog(this DBProxyComponent self, ComponentWithId component)
  45. {
  46. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(self.dbAddress);
  47. await session.Call(new DBSaveRequest { Component = component, CollectionName = "Log" });
  48. }
  49. public static ETTask<T> Query<T>(this DBProxyComponent self, long id) where T: ComponentWithId
  50. {
  51. string key = typeof (T).Name + id;
  52. ETTaskCompletionSource<T> tcs = new ETTaskCompletionSource<T>();
  53. if (self.TcsQueue.ContainsKey(key))
  54. {
  55. self.TcsQueue.Add(key, tcs);
  56. return tcs.Task;
  57. }
  58. self.TcsQueue.Add(key, tcs);
  59. self.QueryInner<T>(id, key).Coroutine();
  60. return tcs.Task;
  61. }
  62. private static async ETVoid QueryInner<T>(this DBProxyComponent self, long id, string key) where T: ComponentWithId
  63. {
  64. try
  65. {
  66. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(self.dbAddress);
  67. DBQueryResponse dbQueryResponse = (DBQueryResponse)await session.Call(new DBQueryRequest { CollectionName = typeof(T).Name, Id = id });
  68. T result = (T)dbQueryResponse.Component;
  69. object[] tcss = self.TcsQueue.GetAll(key);
  70. self.TcsQueue.Remove(key);
  71. foreach (ETTaskCompletionSource<T> tcs in tcss)
  72. {
  73. tcs.SetResult(result);
  74. }
  75. }
  76. catch (Exception e)
  77. {
  78. object[] tcss = self.TcsQueue.GetAll(key);
  79. self.TcsQueue.Remove(key);
  80. foreach (ETTaskCompletionSource<T> tcs in tcss)
  81. {
  82. tcs.SetException(e);
  83. }
  84. }
  85. }
  86. /// <summary>
  87. /// 根据查询表达式查询
  88. /// </summary>
  89. /// <param name="self"></param>
  90. /// <param name="exp"></param>
  91. /// <typeparam name="T"></typeparam>
  92. /// <returns></returns>
  93. public static async ETTask<List<ComponentWithId>> Query<T>(this DBProxyComponent self, Expression<Func<T ,bool>> exp) where T: ComponentWithId
  94. {
  95. ExpressionFilterDefinition<T> filter = new ExpressionFilterDefinition<T>(exp);
  96. IBsonSerializerRegistry serializerRegistry = BsonSerializer.SerializerRegistry;
  97. IBsonSerializer<T> documentSerializer = serializerRegistry.GetSerializer<T>();
  98. string json = filter.Render(documentSerializer, serializerRegistry).ToJson();
  99. return await self.Query<T>(json);
  100. }
  101. public static async ETTask<List<ComponentWithId>> Query<T>(this DBProxyComponent self, List<long> ids) where T : ComponentWithId
  102. {
  103. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(self.dbAddress);
  104. DBQueryBatchResponse dbQueryBatchResponse = (DBQueryBatchResponse)await session.Call(new DBQueryBatchRequest { CollectionName = typeof(T).Name, IdList = ids });
  105. return dbQueryBatchResponse.Components;
  106. }
  107. /// <summary>
  108. /// 根据json查询条件查询
  109. /// </summary>
  110. /// <param name="self"></param>
  111. /// <param name="json"></param>
  112. /// <typeparam name="T"></typeparam>
  113. /// <returns></returns>
  114. public static ETTask<List<ComponentWithId>> Query<T>(this DBProxyComponent self, string json) where T : ComponentWithId
  115. {
  116. string key = typeof (T).Name + json;
  117. ETTaskCompletionSource<List<ComponentWithId>> tcs = new ETTaskCompletionSource<List<ComponentWithId>>();
  118. if (self.TcsQueue.ContainsKey(key))
  119. {
  120. self.TcsQueue.Add(key, tcs);
  121. return tcs.Task;
  122. }
  123. self.TcsQueue.Add(key, tcs);
  124. self.QueryInner<T>(json, key).Coroutine();
  125. return tcs.Task;
  126. }
  127. private static async ETVoid QueryInner<T>(this DBProxyComponent self, string json, string key) where T : ComponentWithId
  128. {
  129. try
  130. {
  131. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(self.dbAddress);
  132. DBQueryJsonResponse dbQueryJsonResponse = (DBQueryJsonResponse)await session.Call(new DBQueryJsonRequest { CollectionName = typeof(T).Name, Json = json });
  133. var result = dbQueryJsonResponse.Components;
  134. object[] tcss = self.TcsQueue.GetAll(key);
  135. self.TcsQueue.Remove(key);
  136. foreach (ETTaskCompletionSource<List<ComponentWithId>> tcs in tcss)
  137. {
  138. tcs.SetResult(result);
  139. }
  140. }
  141. catch (Exception e)
  142. {
  143. object[] tcss = self.TcsQueue.GetAll(key);
  144. self.TcsQueue.Remove(key);
  145. foreach (ETTaskCompletionSource<List<ComponentWithId>> tcs in tcss)
  146. {
  147. tcs.SetException(e);
  148. }
  149. }
  150. }
  151. }
  152. }