ActorProxy.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace Model
  7. {
  8. [ObjectSystem]
  9. public class ActorProxyAwakeSystem : AwakeSystem<ActorProxy>
  10. {
  11. public override void Awake(ActorProxy self)
  12. {
  13. self.Awake();
  14. }
  15. }
  16. [ObjectSystem]
  17. public class ActorProxyStartSystem : StartSystem<ActorProxy>
  18. {
  19. public override void Start(ActorProxy self)
  20. {
  21. self.Start();
  22. }
  23. }
  24. public sealed class ActorProxy : Component
  25. {
  26. // actor的地址
  27. public IPEndPoint Address;
  28. // 已发送等待回应的消息
  29. public Queue<ActorTask> RunningTasks = new Queue<ActorTask>();
  30. // 还没发送的消息
  31. public Queue<ActorTask> WaitingTasks = new Queue<ActorTask>();
  32. // 发送窗口大小
  33. public int WindowSize = 1;
  34. // 最大窗口
  35. public const int MaxWindowSize = 1;
  36. // 最近发送消息的时间
  37. public long LastSendTime;
  38. private TaskCompletionSource<ActorTask> tcs;
  39. public CancellationTokenSource CancellationTokenSource;
  40. private int failTimes;
  41. public void Awake()
  42. {
  43. this.LastSendTime = TimeHelper.Now();
  44. this.WindowSize = 1;
  45. this.tcs = null;
  46. this.CancellationTokenSource = new CancellationTokenSource();
  47. }
  48. public override void Dispose()
  49. {
  50. if (this.IsDisposed)
  51. {
  52. return;
  53. }
  54. base.Dispose();
  55. this.LastSendTime = 0;
  56. this.Address = null;
  57. this.RunningTasks.Clear();
  58. this.WaitingTasks.Clear();
  59. this.failTimes = 0;
  60. var t = this.tcs;
  61. this.tcs = null;
  62. t?.SetResult(new ActorTask());
  63. }
  64. public async void Start()
  65. {
  66. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  67. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  68. this.UpdateAsync();
  69. }
  70. private void Add(ActorTask task)
  71. {
  72. if (this.IsDisposed)
  73. {
  74. throw new Exception("ActorProxy Disposed! dont hold actorproxy");
  75. }
  76. this.WaitingTasks.Enqueue(task);
  77. // failtimes > 0表示正在重试,这时候不能加到正在发送队列
  78. if (this.failTimes == 0)
  79. {
  80. this.AllowGet();
  81. }
  82. }
  83. private void AllowGet()
  84. {
  85. if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
  86. {
  87. return;
  88. }
  89. ActorTask task = this.WaitingTasks.Dequeue();
  90. this.RunningTasks.Enqueue(task);
  91. var t = this.tcs;
  92. this.tcs = null;
  93. t.SetResult(task);
  94. }
  95. private Task<ActorTask> GetAsync()
  96. {
  97. if (this.WaitingTasks.Count > 0)
  98. {
  99. ActorTask task = this.WaitingTasks.Dequeue();
  100. this.RunningTasks.Enqueue(task);
  101. return Task.FromResult(task);
  102. }
  103. this.tcs = new TaskCompletionSource<ActorTask>();
  104. return this.tcs.Task;
  105. }
  106. private async void UpdateAsync()
  107. {
  108. while (true)
  109. {
  110. ActorTask actorTask = await this.GetAsync();
  111. if (this.IsDisposed)
  112. {
  113. return;
  114. }
  115. try
  116. {
  117. this.RunTask(actorTask);
  118. }
  119. catch (Exception e)
  120. {
  121. Log.Error(e.ToString());
  122. return;
  123. }
  124. }
  125. }
  126. private async void RunTask(ActorTask task)
  127. {
  128. try
  129. {
  130. IResponse response = await task.Run();
  131. // 如果没找到Actor,发送窗口减少为1,重试
  132. if (response.Error == ErrorCode.ERR_NotFoundActor)
  133. {
  134. this.CancellationTokenSource.Cancel();
  135. this.WindowSize = 1;
  136. ++this.failTimes;
  137. while (this.WaitingTasks.Count > 0)
  138. {
  139. ActorTask actorTask = this.WaitingTasks.Dequeue();
  140. this.RunningTasks.Enqueue(actorTask);
  141. }
  142. ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);
  143. // 失败3次则清空actor发送队列,返回失败
  144. if (this.failTimes > 3)
  145. {
  146. while (this.WaitingTasks.Count > 0)
  147. {
  148. ActorTask actorTask = this.WaitingTasks.Dequeue();
  149. actorTask.RunFail(response.Error);
  150. }
  151. // 失败直接删除actorproxy
  152. Game.Scene.GetComponent<ActorProxyComponent>().Remove(this.Id);
  153. return;
  154. }
  155. // 等待一会再发送
  156. await Game.Scene.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
  157. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  158. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  159. this.CancellationTokenSource = new CancellationTokenSource();
  160. this.AllowGet();
  161. return;
  162. }
  163. // 发送成功
  164. this.LastSendTime = TimeHelper.Now();
  165. this.failTimes = 0;
  166. if (this.WindowSize < MaxWindowSize)
  167. {
  168. ++this.WindowSize;
  169. }
  170. this.RunningTasks.Dequeue();
  171. this.AllowGet();
  172. }
  173. catch (Exception e)
  174. {
  175. Log.Error(e.ToString());
  176. }
  177. }
  178. public void Send(IMessage message)
  179. {
  180. ActorTask task = new ActorTask();
  181. task.message = (MessageObject)message;
  182. task.proxy = this;
  183. this.Add(task);
  184. }
  185. public Task<IResponse> Call(IRequest request)
  186. {
  187. ActorTask task = new ActorTask();
  188. task.message = (MessageObject)request;
  189. task.proxy = this;
  190. task.Tcs = new TaskCompletionSource<IResponse>();
  191. this.Add(task);
  192. return task.Tcs.Task;
  193. }
  194. public async Task<IResponse> RealCall(ActorRequest request, CancellationToken cancellationToken)
  195. {
  196. try
  197. {
  198. //Log.Debug($"realcall {MongoHelper.ToJson(request)} {this.Address}");
  199. request.Id = this.Id;
  200. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
  201. IResponse response = await session.Call(request, cancellationToken);
  202. return response;
  203. }
  204. catch (RpcException e)
  205. {
  206. Log.Error($"{this.Address} {e}");
  207. throw;
  208. }
  209. }
  210. public string DebugQueue(Queue<ActorTask> tasks)
  211. {
  212. string s = "";
  213. foreach (ActorTask task in tasks)
  214. {
  215. s += $" {task.message.GetType().Name}";
  216. }
  217. return s;
  218. }
  219. }
  220. }