ActorProxy.cs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MongoDB.Bson.Serialization.Attributes;
  7. namespace Model
  8. {
  9. public abstract class ActorTask
  10. {
  11. [BsonIgnore]
  12. public ActorProxy proxy;
  13. [BsonElement]
  14. public AMessage message;
  15. public abstract Task<AResponse> Run();
  16. public abstract void RunFail(int error);
  17. }
  18. /// <summary>
  19. /// 普通消息,不需要response
  20. /// </summary>
  21. public class ActorMessageTask: ActorTask
  22. {
  23. public ActorMessageTask(ActorProxy proxy, AMessage message)
  24. {
  25. this.proxy = proxy;
  26. this.message = message;
  27. }
  28. public override async Task<AResponse> Run()
  29. {
  30. ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
  31. ActorResponse response = await this.proxy.RealCall<ActorResponse>(request, this.proxy.CancellationTokenSource.Token);
  32. return response;
  33. }
  34. public override void RunFail(int error)
  35. {
  36. }
  37. }
  38. /// <summary>
  39. /// Rpc消息,需要等待返回
  40. /// </summary>
  41. /// <typeparam name="Response"></typeparam>
  42. public class ActorRpcTask<Response> : ActorTask where Response: AResponse
  43. {
  44. [BsonIgnore]
  45. public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
  46. public ActorRpcTask(ActorProxy proxy, ARequest message)
  47. {
  48. this.proxy = proxy;
  49. this.message = message;
  50. }
  51. public override async Task<AResponse> Run()
  52. {
  53. ActorRpcRequest request = new ActorRpcRequest() { Id = this.proxy.Id, AMessage = this.message };
  54. ActorRpcResponse response = await this.proxy.RealCall<ActorRpcResponse>(request, this.proxy.CancellationTokenSource.Token);
  55. if (response.Error != ErrorCode.ERR_NotFoundActor)
  56. {
  57. this.Tcs.SetResult((Response)response.AMessage);
  58. }
  59. return response;
  60. }
  61. public override void RunFail(int error)
  62. {
  63. this.Tcs.SetException(new RpcException(error, ""));
  64. }
  65. }
  66. [ObjectSystem]
  67. public class ActorProxySystem : ObjectSystem<ActorProxy>, IAwake, IStart
  68. {
  69. public void Awake()
  70. {
  71. this.Get().Awake();
  72. }
  73. public void Start()
  74. {
  75. this.Get().Start();
  76. }
  77. }
  78. public sealed class ActorProxy : Disposer
  79. {
  80. // actor的地址
  81. public IPEndPoint Address;
  82. // 已发送等待回应的消息
  83. public Queue<ActorTask> RunningTasks = new Queue<ActorTask>();
  84. // 还没发送的消息
  85. public Queue<ActorTask> WaitingTasks = new Queue<ActorTask>();
  86. // 发送窗口大小
  87. public int WindowSize = 1;
  88. // 最大窗口
  89. public const int MaxWindowSize = 1;
  90. // 最近发送消息的时间
  91. public long LastSendTime;
  92. private TaskCompletionSource<ActorTask> tcs;
  93. public CancellationTokenSource CancellationTokenSource;
  94. private int failTimes;
  95. public void Awake()
  96. {
  97. this.LastSendTime = TimeHelper.Now();
  98. this.RunningTasks.Clear();
  99. this.WaitingTasks.Clear();
  100. this.WindowSize = 1;
  101. this.tcs = null;
  102. this.CancellationTokenSource = new CancellationTokenSource();
  103. }
  104. public override void Dispose()
  105. {
  106. if (this.Id == 0)
  107. {
  108. return;
  109. }
  110. base.Dispose();
  111. this.LastSendTime = 0;
  112. this.Address = null;
  113. this.RunningTasks.Clear();
  114. this.WaitingTasks.Clear();
  115. this.failTimes = 0;
  116. var t = this.tcs;
  117. this.tcs = null;
  118. t?.SetResult(null);
  119. }
  120. public async void Start()
  121. {
  122. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  123. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  124. this.UpdateAsync();
  125. }
  126. private void Add(ActorTask task)
  127. {
  128. if (this.Id == 0)
  129. {
  130. throw new Exception("ActorProxy Disposed! dont hold actorproxy");
  131. }
  132. this.WaitingTasks.Enqueue(task);
  133. // failtimes > 0表示正在重试,这时候不能加到正在发送队列
  134. if (this.failTimes == 0)
  135. {
  136. this.AllowGet();
  137. }
  138. }
  139. private void Remove()
  140. {
  141. this.RunningTasks.Dequeue();
  142. this.AllowGet();
  143. }
  144. private void AllowGet()
  145. {
  146. if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
  147. {
  148. return;
  149. }
  150. ActorTask task = this.WaitingTasks.Dequeue();
  151. this.RunningTasks.Enqueue(task);
  152. var t = this.tcs;
  153. this.tcs = null;
  154. t.SetResult(task);
  155. }
  156. private Task<ActorTask> GetAsync()
  157. {
  158. if (this.WaitingTasks.Count > 0)
  159. {
  160. ActorTask task = this.WaitingTasks.Dequeue();
  161. this.RunningTasks.Enqueue(task);
  162. return Task.FromResult(task);
  163. }
  164. this.tcs = new TaskCompletionSource<ActorTask>();
  165. return this.tcs.Task;
  166. }
  167. private async void UpdateAsync()
  168. {
  169. while (true)
  170. {
  171. ActorTask actorTask = await this.GetAsync();
  172. if (this.Id == 0)
  173. {
  174. return;
  175. }
  176. if (actorTask == null)
  177. {
  178. return;
  179. }
  180. try
  181. {
  182. this.RunTask(actorTask);
  183. }
  184. catch (Exception e)
  185. {
  186. Log.Error(e.ToString());
  187. return;
  188. }
  189. }
  190. }
  191. private async void RunTask(ActorTask task)
  192. {
  193. try
  194. {
  195. AResponse response = await task.Run();
  196. // 如果没找到Actor,发送窗口减少为1,重试
  197. if (response.Error == ErrorCode.ERR_NotFoundActor)
  198. {
  199. this.CancellationTokenSource.Cancel();
  200. this.WindowSize = 1;
  201. ++this.failTimes;
  202. while (this.WaitingTasks.Count > 0)
  203. {
  204. ActorTask actorTask = this.WaitingTasks.Dequeue();
  205. this.RunningTasks.Enqueue(actorTask);
  206. }
  207. ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);
  208. // 失败3次则清空actor发送队列,返回失败
  209. if (this.failTimes > 3)
  210. {
  211. while (this.WaitingTasks.Count > 0)
  212. {
  213. ActorTask actorTask = this.WaitingTasks.Dequeue();
  214. actorTask.RunFail(response.Error);
  215. }
  216. // 失败直接删除actorproxy
  217. Game.Scene.GetComponent<ActorProxyComponent>().Remove(this.Id);
  218. return;
  219. }
  220. // 等待一会再发送
  221. await Game.Scene.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
  222. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  223. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  224. this.CancellationTokenSource = new CancellationTokenSource();
  225. this.AllowGet();
  226. return;
  227. }
  228. // 发送成功
  229. this.LastSendTime = TimeHelper.Now();
  230. this.failTimes = 0;
  231. if (this.WindowSize < MaxWindowSize)
  232. {
  233. ++this.WindowSize;
  234. }
  235. this.Remove();
  236. }
  237. catch (Exception e)
  238. {
  239. Log.Error(e.ToString());
  240. }
  241. }
  242. public void Send(AMessage message)
  243. {
  244. ActorMessageTask task = new ActorMessageTask(this, message);
  245. this.Add(task);
  246. }
  247. public Task<Response> Call<Response>(ARequest request)where Response : AResponse
  248. {
  249. ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
  250. this.Add(task);
  251. return task.Tcs.Task;
  252. }
  253. public async Task<Response> RealCall<Response>(ActorRequest request, CancellationToken cancellationToken) where Response: AResponse
  254. {
  255. try
  256. {
  257. //Log.Debug($"realcall {MongoHelper.ToJson(request)} {this.Address}");
  258. request.Id = this.Id;
  259. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
  260. Response response = await session.Call<Response>(request, cancellationToken);
  261. return response;
  262. }
  263. catch (RpcException e)
  264. {
  265. Log.Error($"{this.Address} {e}");
  266. throw;
  267. }
  268. }
  269. public string DebugQueue(Queue<ActorTask> tasks)
  270. {
  271. string s = "";
  272. foreach (ActorTask task in tasks)
  273. {
  274. s += $" {task.message.GetType().Name}";
  275. }
  276. return s;
  277. }
  278. }
  279. }