ActorProxy.cs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. namespace Model
  6. {
  7. public abstract class ActorTask
  8. {
  9. public abstract Task<AResponse> Run();
  10. public abstract void RunFail(int error);
  11. }
  12. /// <summary>
  13. /// 普通消息,不需要response
  14. /// </summary>
  15. public class ActorMessageTask: ActorTask
  16. {
  17. private readonly ActorProxy proxy;
  18. private readonly ARequest message;
  19. public ActorMessageTask(ActorProxy proxy, ARequest message)
  20. {
  21. this.proxy = proxy;
  22. this.message = message;
  23. }
  24. public override async Task<AResponse> Run()
  25. {
  26. AResponse response = await this.proxy.RealCall<ActorMessageResponse>(this.message, this.proxy.CancellationTokenSource.Token);
  27. return response;
  28. }
  29. public override void RunFail(int error)
  30. {
  31. }
  32. }
  33. /// <summary>
  34. /// Rpc消息,需要等待返回
  35. /// </summary>
  36. /// <typeparam name="Response"></typeparam>
  37. public class ActorRpcTask<Response> : ActorTask where Response: AActorResponse
  38. {
  39. private readonly ActorProxy proxy;
  40. private readonly AActorRequest message;
  41. public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
  42. public ActorRpcTask(ActorProxy proxy, AActorRequest message)
  43. {
  44. this.proxy = proxy;
  45. this.message = message;
  46. }
  47. public override async Task<AResponse> Run()
  48. {
  49. Response response = await this.proxy.RealCall<Response>(this.message, this.proxy.CancellationTokenSource.Token);
  50. if (response.Error != ErrorCode.ERR_NotFoundActor)
  51. {
  52. this.Tcs.SetResult(response);
  53. }
  54. return response;
  55. }
  56. public override void RunFail(int error)
  57. {
  58. this.Tcs.SetException(new RpcException(error, ""));
  59. }
  60. }
  61. [ObjectEvent]
  62. public class ActorProxyEvent : ObjectEvent<ActorProxy>, IAwake, IStart
  63. {
  64. public void Awake()
  65. {
  66. this.Get().Awake();
  67. }
  68. public void Start()
  69. {
  70. this.Get().Start();
  71. }
  72. }
  73. public sealed class ActorProxy : Entity
  74. {
  75. // actor的地址
  76. public string Address;
  77. // 已发送等待回应的消息
  78. public Queue<ActorTask> RunningTasks;
  79. // 还没发送的消息
  80. public Queue<ActorTask> WaitingTasks;
  81. // 发送窗口大小
  82. public int WindowSize = 1;
  83. // 最大窗口
  84. public const int MaxWindowSize = 100;
  85. private TaskCompletionSource<ActorTask> tcs;
  86. public CancellationTokenSource CancellationTokenSource;
  87. private int failTimes;
  88. public void Awake()
  89. {
  90. this.RunningTasks = new Queue<ActorTask>();
  91. this.WaitingTasks = new Queue<ActorTask>();
  92. this.WindowSize = 1;
  93. this.CancellationTokenSource = new CancellationTokenSource();
  94. }
  95. public void Start()
  96. {
  97. this.UpdateAsync();
  98. }
  99. private void Add(ActorTask task)
  100. {
  101. this.WaitingTasks.Enqueue(task);
  102. this.AllowGet();
  103. }
  104. private void Remove()
  105. {
  106. this.RunningTasks.Dequeue();
  107. this.AllowGet();
  108. }
  109. private void AllowGet()
  110. {
  111. if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
  112. {
  113. return;
  114. }
  115. var t = this.tcs;
  116. this.tcs = null;
  117. ActorTask task = this.WaitingTasks.Dequeue();
  118. this.RunningTasks.Enqueue(task);
  119. t.SetResult(task);
  120. }
  121. private Task<ActorTask> GetAsync()
  122. {
  123. if (this.WaitingTasks.Count > 0)
  124. {
  125. ActorTask task = this.WaitingTasks.Dequeue();
  126. this.RunningTasks.Enqueue(task);
  127. return Task.FromResult(task);
  128. }
  129. this.tcs = new TaskCompletionSource<ActorTask>();
  130. return this.tcs.Task;
  131. }
  132. private async void UpdateAsync()
  133. {
  134. if (this.Address == null)
  135. {
  136. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  137. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().Address;
  138. }
  139. while (true)
  140. {
  141. ActorTask actorTask = await this.GetAsync();
  142. this.RunTask(actorTask);
  143. }
  144. }
  145. private async void RunTask(ActorTask task)
  146. {
  147. try
  148. {
  149. AResponse response = await task.Run();
  150. // 如果没找到Actor,发送窗口减少为1,重试
  151. if (response.Error == ErrorCode.ERR_NotFoundActor)
  152. {
  153. this.CancellationTokenSource.Cancel();
  154. this.WindowSize = 1;
  155. ++this.failTimes;
  156. while (this.WaitingTasks.Count > 0)
  157. {
  158. ActorTask actorTask = this.WaitingTasks.Dequeue();
  159. this.RunningTasks.Enqueue(actorTask);
  160. }
  161. ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);
  162. // 失败3次则清空actor发送队列,返回失败
  163. if (this.failTimes > 3)
  164. {
  165. while (this.WaitingTasks.Count > 0)
  166. {
  167. ActorTask actorTask = this.WaitingTasks.Dequeue();
  168. actorTask.RunFail(response.Error);
  169. }
  170. return;
  171. }
  172. // 等待一会再发送
  173. await this.Parent.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
  174. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  175. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().Address;
  176. this.CancellationTokenSource = new CancellationTokenSource();
  177. this.AllowGet();
  178. return;
  179. }
  180. // 发送成功
  181. this.failTimes = 0;
  182. if (this.WindowSize < MaxWindowSize)
  183. {
  184. ++this.WindowSize;
  185. }
  186. this.Remove();
  187. }
  188. catch (Exception e)
  189. {
  190. Log.Error(e.ToString());
  191. }
  192. }
  193. public void Send(AActorMessage message)
  194. {
  195. message.Id = this.Id;
  196. ActorMessageTask task = new ActorMessageTask(this, message);
  197. this.Add(task);
  198. }
  199. public Task<Response> Call<Response>(AActorRequest request)where Response : AActorResponse
  200. {
  201. request.Id = this.Id;
  202. ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
  203. this.Add(task);
  204. return task.Tcs.Task;
  205. }
  206. public async Task<Response> RealCall<Response>(ARequest request, CancellationToken cancellationToken) where Response: AResponse
  207. {
  208. try
  209. {
  210. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
  211. Response response = await session.Call<Response>(request, cancellationToken);
  212. return response;
  213. }
  214. catch (RpcException e)
  215. {
  216. Log.Error($"{this.Address} {e}");
  217. throw;
  218. }
  219. }
  220. public override void Dispose()
  221. {
  222. if (this.Id == 0)
  223. {
  224. return;
  225. }
  226. base.Dispose();
  227. }
  228. }
  229. }