ActorProxy.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. namespace Model
  6. {
  7. public abstract class ActorTask
  8. {
  9. public abstract Task<AResponse> Run();
  10. public abstract void RunFail(int error);
  11. }
  12. /// <summary>
  13. /// 普通消息,不需要response
  14. /// </summary>
  15. public class ActorMessageTask: ActorTask
  16. {
  17. private readonly ActorProxy proxy;
  18. private readonly AMessage message;
  19. public ActorMessageTask(ActorProxy proxy, AMessage message)
  20. {
  21. this.proxy = proxy;
  22. this.message = message;
  23. }
  24. public override async Task<AResponse> Run()
  25. {
  26. ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
  27. ActorResponse response = await this.proxy.RealCall<ActorResponse>(request, this.proxy.CancellationTokenSource.Token);
  28. return response;
  29. }
  30. public override void RunFail(int error)
  31. {
  32. }
  33. }
  34. /// <summary>
  35. /// Rpc消息,需要等待返回
  36. /// </summary>
  37. /// <typeparam name="Response"></typeparam>
  38. public class ActorRpcTask<Response> : ActorTask where Response: AResponse
  39. {
  40. private readonly ActorProxy proxy;
  41. private readonly ARequest message;
  42. public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
  43. public ActorRpcTask(ActorProxy proxy, ARequest message)
  44. {
  45. this.proxy = proxy;
  46. this.message = message;
  47. }
  48. public override async Task<AResponse> Run()
  49. {
  50. ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
  51. Response response = await this.proxy.RealCall<Response>(request, this.proxy.CancellationTokenSource.Token);
  52. if (response.Error != ErrorCode.ERR_NotFoundActor)
  53. {
  54. this.Tcs.SetResult(response);
  55. }
  56. return response;
  57. }
  58. public override void RunFail(int error)
  59. {
  60. this.Tcs.SetException(new RpcException(error, ""));
  61. }
  62. }
  63. [ObjectEvent]
  64. public class ActorProxyEvent : ObjectEvent<ActorProxy>, IAwake, IStart
  65. {
  66. public void Awake()
  67. {
  68. this.Get().Awake();
  69. }
  70. public void Start()
  71. {
  72. this.Get().Start();
  73. }
  74. }
  75. public sealed class ActorProxy : Entity
  76. {
  77. // actor的地址
  78. public string Address;
  79. // 已发送等待回应的消息
  80. public Queue<ActorTask> RunningTasks;
  81. // 还没发送的消息
  82. public Queue<ActorTask> WaitingTasks;
  83. // 发送窗口大小
  84. public int WindowSize = 1;
  85. // 最大窗口
  86. public const int MaxWindowSize = 100;
  87. private TaskCompletionSource<ActorTask> tcs;
  88. public CancellationTokenSource CancellationTokenSource;
  89. private int failTimes;
  90. public void Awake()
  91. {
  92. this.RunningTasks = new Queue<ActorTask>();
  93. this.WaitingTasks = new Queue<ActorTask>();
  94. this.WindowSize = 1;
  95. this.CancellationTokenSource = new CancellationTokenSource();
  96. }
  97. public void Start()
  98. {
  99. this.UpdateAsync();
  100. }
  101. private void Add(ActorTask task)
  102. {
  103. this.WaitingTasks.Enqueue(task);
  104. this.AllowGet();
  105. }
  106. private void Remove()
  107. {
  108. this.RunningTasks.Dequeue();
  109. this.AllowGet();
  110. }
  111. private void AllowGet()
  112. {
  113. if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
  114. {
  115. return;
  116. }
  117. var t = this.tcs;
  118. this.tcs = null;
  119. ActorTask task = this.WaitingTasks.Dequeue();
  120. this.RunningTasks.Enqueue(task);
  121. t.SetResult(task);
  122. }
  123. private Task<ActorTask> GetAsync()
  124. {
  125. if (this.WaitingTasks.Count > 0)
  126. {
  127. ActorTask task = this.WaitingTasks.Dequeue();
  128. this.RunningTasks.Enqueue(task);
  129. return Task.FromResult(task);
  130. }
  131. this.tcs = new TaskCompletionSource<ActorTask>();
  132. return this.tcs.Task;
  133. }
  134. private async void UpdateAsync()
  135. {
  136. if (this.Address == null)
  137. {
  138. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  139. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().Address;
  140. }
  141. while (true)
  142. {
  143. ActorTask actorTask = await this.GetAsync();
  144. this.RunTask(actorTask);
  145. }
  146. }
  147. private async void RunTask(ActorTask task)
  148. {
  149. try
  150. {
  151. AResponse response = await task.Run();
  152. // 如果没找到Actor,发送窗口减少为1,重试
  153. if (response.Error == ErrorCode.ERR_NotFoundActor)
  154. {
  155. this.CancellationTokenSource.Cancel();
  156. this.WindowSize = 1;
  157. ++this.failTimes;
  158. while (this.WaitingTasks.Count > 0)
  159. {
  160. ActorTask actorTask = this.WaitingTasks.Dequeue();
  161. this.RunningTasks.Enqueue(actorTask);
  162. }
  163. ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);
  164. // 失败3次则清空actor发送队列,返回失败
  165. if (this.failTimes > 3)
  166. {
  167. while (this.WaitingTasks.Count > 0)
  168. {
  169. ActorTask actorTask = this.WaitingTasks.Dequeue();
  170. actorTask.RunFail(response.Error);
  171. }
  172. return;
  173. }
  174. // 等待一会再发送
  175. await this.Parent.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
  176. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  177. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().Address;
  178. this.CancellationTokenSource = new CancellationTokenSource();
  179. this.AllowGet();
  180. return;
  181. }
  182. // 发送成功
  183. this.failTimes = 0;
  184. if (this.WindowSize < MaxWindowSize)
  185. {
  186. ++this.WindowSize;
  187. }
  188. this.Remove();
  189. }
  190. catch (Exception e)
  191. {
  192. Log.Error(e.ToString());
  193. }
  194. }
  195. public void Send(AMessage message)
  196. {
  197. ActorMessageTask task = new ActorMessageTask(this, message);
  198. this.Add(task);
  199. }
  200. public Task<Response> Call<Response>(ARequest request)where Response : AResponse
  201. {
  202. ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
  203. this.Add(task);
  204. return task.Tcs.Task;
  205. }
  206. public async Task<Response> RealCall<Response>(ActorRequest request, CancellationToken cancellationToken) where Response: AResponse
  207. {
  208. try
  209. {
  210. request.Id = this.Id;
  211. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
  212. Response response = await session.Call<Response>(request, cancellationToken);
  213. return response;
  214. }
  215. catch (RpcException e)
  216. {
  217. Log.Error($"{this.Address} {e}");
  218. throw;
  219. }
  220. }
  221. public override void Dispose()
  222. {
  223. if (this.Id == 0)
  224. {
  225. return;
  226. }
  227. base.Dispose();
  228. }
  229. }
  230. }