ActorProxy.cs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. using System.Collections.Generic;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. namespace Model
  5. {
  6. public abstract class ActorTask
  7. {
  8. public abstract Task<AResponse> Run();
  9. public abstract void RunFail(int error);
  10. }
  11. /// <summary>
  12. /// 普通消息,不需要response
  13. /// </summary>
  14. public class ActorMessageTask: ActorTask
  15. {
  16. private readonly ActorProxy proxy;
  17. private readonly ARequest message;
  18. public ActorMessageTask(ActorProxy proxy, ARequest message)
  19. {
  20. this.proxy = proxy;
  21. this.message = message;
  22. }
  23. public override async Task<AResponse> Run()
  24. {
  25. AResponse response = await this.proxy.RealCall<ActorMessageResponse>(this.message, this.proxy.CancellationTokenSource.Token);
  26. return response;
  27. }
  28. public override void RunFail(int error)
  29. {
  30. }
  31. }
  32. /// <summary>
  33. /// Rpc消息,需要等待返回
  34. /// </summary>
  35. /// <typeparam name="Response"></typeparam>
  36. public class ActorRpcTask<Response> : ActorTask where Response: AActorResponse
  37. {
  38. private readonly ActorProxy proxy;
  39. private readonly AActorRequest message;
  40. public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
  41. public ActorRpcTask(ActorProxy proxy, AActorRequest message)
  42. {
  43. this.proxy = proxy;
  44. this.message = message;
  45. }
  46. public override async Task<AResponse> Run()
  47. {
  48. Response response = await this.proxy.RealCall<Response>(this.message, this.proxy.CancellationTokenSource.Token);
  49. if (response.Error != ErrorCode.ERR_NotFoundActor)
  50. {
  51. this.Tcs.SetResult(response);
  52. }
  53. return response;
  54. }
  55. public override void RunFail(int error)
  56. {
  57. this.Tcs.SetException(new RpcException(error, ""));
  58. }
  59. }
  60. public sealed class ActorProxy : Entity
  61. {
  62. // actor的地址
  63. public string Address;
  64. // 已发送等待回应的消息
  65. public Queue<ActorTask> RunningTasks;
  66. // 还没发送的消息
  67. public Queue<ActorTask> WaitingTasks;
  68. // 发送窗口大小
  69. public int WindowSize = 1;
  70. // 最大窗口
  71. public const int MaxWindowSize = 100;
  72. private TaskCompletionSource<ActorTask> tcs;
  73. public CancellationTokenSource CancellationTokenSource;
  74. private int failTimes;
  75. public ActorProxy(long id): base(id)
  76. {
  77. this.UpdateAsync();
  78. }
  79. private void Add(ActorTask task)
  80. {
  81. this.WaitingTasks.Enqueue(task);
  82. this.AllowGet();
  83. }
  84. private void Remove()
  85. {
  86. this.RunningTasks.Dequeue();
  87. this.AllowGet();
  88. }
  89. private void AllowGet()
  90. {
  91. if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
  92. {
  93. return;
  94. }
  95. var t = this.tcs;
  96. this.tcs = null;
  97. ActorTask task = this.WaitingTasks.Dequeue();
  98. this.RunningTasks.Enqueue(task);
  99. t.SetResult(task);
  100. }
  101. private Task<ActorTask> GetAsync()
  102. {
  103. if (this.WaitingTasks.Count > 0)
  104. {
  105. ActorTask task = this.WaitingTasks.Dequeue();
  106. this.RunningTasks.Enqueue(task);
  107. return Task.FromResult(task);
  108. }
  109. this.tcs = new TaskCompletionSource<ActorTask>();
  110. return this.tcs.Task;
  111. }
  112. private async void UpdateAsync()
  113. {
  114. while (true)
  115. {
  116. ActorTask actorTask = await this.GetAsync();
  117. this.RunTask(actorTask);
  118. }
  119. }
  120. private async void RunTask(ActorTask task)
  121. {
  122. AResponse response = await task.Run();
  123. // 如果没找到Actor,发送窗口减少为1,重试
  124. if (response.Error == ErrorCode.ERR_NotFoundActor)
  125. {
  126. this.CancellationTokenSource.Cancel();
  127. this.WindowSize = 1;
  128. ++this.failTimes;
  129. while (this.WaitingTasks.Count > 0)
  130. {
  131. ActorTask actorTask = this.WaitingTasks.Dequeue();
  132. this.RunningTasks.Enqueue(actorTask);
  133. }
  134. ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);
  135. // 失败3次则清空actor发送队列,返回失败
  136. if (this.failTimes > 3)
  137. {
  138. while (this.WaitingTasks.Count > 0)
  139. {
  140. ActorTask actorTask = this.WaitingTasks.Dequeue();
  141. actorTask.RunFail(response.Error);
  142. }
  143. return;
  144. }
  145. // 等待一会再发送
  146. await this.Parent.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
  147. this.Address = await this.Parent.GetComponent<LocationProxyComponent>().Get(this.Id);
  148. this.CancellationTokenSource = new CancellationTokenSource();
  149. this.AllowGet();
  150. return;
  151. }
  152. // 发送成功
  153. this.failTimes = 0;
  154. if (this.WindowSize < MaxWindowSize)
  155. {
  156. ++this.WindowSize;
  157. }
  158. this.Remove();
  159. }
  160. public void Send(AActorMessage message)
  161. {
  162. ActorMessageTask task = new ActorMessageTask(this, message);
  163. this.Add(task);
  164. }
  165. public Task<Response> Call<Response>(AActorRequest request)where Response : AActorResponse
  166. {
  167. ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
  168. this.Add(task);
  169. return task.Tcs.Task;
  170. }
  171. public async Task<Response> RealCall<Response>(ARequest request, CancellationToken cancellationToken) where Response: AResponse
  172. {
  173. try
  174. {
  175. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
  176. Response response = await session.Call<Response>(request, cancellationToken);
  177. return response;
  178. }
  179. catch (RpcException e)
  180. {
  181. Log.Error(e.ToString());
  182. throw;
  183. }
  184. }
  185. public override void Dispose()
  186. {
  187. if (this.Id == 0)
  188. {
  189. return;
  190. }
  191. base.Dispose();
  192. }
  193. }
  194. }