ActorProxy.cs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MongoDB.Bson.Serialization.Attributes;
  7. namespace Model
  8. {
  9. public abstract class ActorTask
  10. {
  11. [BsonIgnore]
  12. public ActorProxy proxy;
  13. [BsonElement]
  14. public MessageObject message;
  15. public abstract Task<IResponse> Run();
  16. public abstract void RunFail(int error);
  17. }
  18. /// <summary>
  19. /// 普通消息,不需要response
  20. /// </summary>
  21. public class ActorMessageTask: ActorTask
  22. {
  23. public ActorMessageTask(ActorProxy proxy, IMessage message)
  24. {
  25. this.proxy = proxy;
  26. this.message = (MessageObject)message;
  27. }
  28. public override async Task<IResponse> Run()
  29. {
  30. ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
  31. ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token);
  32. return response;
  33. }
  34. public override void RunFail(int error)
  35. {
  36. }
  37. }
  38. /// <summary>
  39. /// Rpc消息,需要等待返回
  40. /// </summary>
  41. public class ActorRpcTask : ActorTask
  42. {
  43. [BsonIgnore]
  44. public readonly TaskCompletionSource<IResponse> Tcs = new TaskCompletionSource<IResponse>();
  45. public ActorRpcTask(ActorProxy proxy, IMessage message)
  46. {
  47. this.proxy = proxy;
  48. this.message = (MessageObject)message;
  49. }
  50. public override async Task<IResponse> Run()
  51. {
  52. ActorRequest request = new ActorRequest() { Id = this.proxy.Id, AMessage = this.message };
  53. ActorResponse response = (ActorResponse)await this.proxy.RealCall(request, this.proxy.CancellationTokenSource.Token);
  54. if (response.Error != ErrorCode.ERR_NotFoundActor)
  55. {
  56. this.Tcs.SetResult((IResponse)response.AMessage);
  57. }
  58. return response;
  59. }
  60. public override void RunFail(int error)
  61. {
  62. this.Tcs.SetException(new RpcException(error, ""));
  63. }
  64. }
  65. [ObjectSystem]
  66. public class ActorProxySystem : ObjectSystem<ActorProxy>, IAwake, IStart
  67. {
  68. public void Awake()
  69. {
  70. this.Get().Awake();
  71. }
  72. public void Start()
  73. {
  74. this.Get().Start();
  75. }
  76. }
  77. public sealed class ActorProxy : Disposer
  78. {
  79. // actor的地址
  80. public IPEndPoint Address;
  81. // 已发送等待回应的消息
  82. public Queue<ActorTask> RunningTasks = new Queue<ActorTask>();
  83. // 还没发送的消息
  84. public Queue<ActorTask> WaitingTasks = new Queue<ActorTask>();
  85. // 发送窗口大小
  86. public int WindowSize = 1;
  87. // 最大窗口
  88. public const int MaxWindowSize = 1;
  89. // 最近发送消息的时间
  90. public long LastSendTime;
  91. private TaskCompletionSource<ActorTask> tcs;
  92. public CancellationTokenSource CancellationTokenSource;
  93. private int failTimes;
  94. public void Awake()
  95. {
  96. this.LastSendTime = TimeHelper.Now();
  97. this.RunningTasks.Clear();
  98. this.WaitingTasks.Clear();
  99. this.WindowSize = 1;
  100. this.tcs = null;
  101. this.CancellationTokenSource = new CancellationTokenSource();
  102. }
  103. public override void Dispose()
  104. {
  105. if (this.Id == 0)
  106. {
  107. return;
  108. }
  109. base.Dispose();
  110. this.LastSendTime = 0;
  111. this.Address = null;
  112. this.RunningTasks.Clear();
  113. this.WaitingTasks.Clear();
  114. this.failTimes = 0;
  115. var t = this.tcs;
  116. this.tcs = null;
  117. t?.SetResult(null);
  118. }
  119. public async void Start()
  120. {
  121. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  122. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  123. this.UpdateAsync();
  124. }
  125. private void Add(ActorTask task)
  126. {
  127. if (this.Id == 0)
  128. {
  129. throw new Exception("ActorProxy Disposed! dont hold actorproxy");
  130. }
  131. this.WaitingTasks.Enqueue(task);
  132. // failtimes > 0表示正在重试,这时候不能加到正在发送队列
  133. if (this.failTimes == 0)
  134. {
  135. this.AllowGet();
  136. }
  137. }
  138. private void Remove()
  139. {
  140. this.RunningTasks.Dequeue();
  141. this.AllowGet();
  142. }
  143. private void AllowGet()
  144. {
  145. if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
  146. {
  147. return;
  148. }
  149. ActorTask task = this.WaitingTasks.Dequeue();
  150. this.RunningTasks.Enqueue(task);
  151. var t = this.tcs;
  152. this.tcs = null;
  153. t.SetResult(task);
  154. }
  155. private Task<ActorTask> GetAsync()
  156. {
  157. if (this.WaitingTasks.Count > 0)
  158. {
  159. ActorTask task = this.WaitingTasks.Dequeue();
  160. this.RunningTasks.Enqueue(task);
  161. return Task.FromResult(task);
  162. }
  163. this.tcs = new TaskCompletionSource<ActorTask>();
  164. return this.tcs.Task;
  165. }
  166. private async void UpdateAsync()
  167. {
  168. while (true)
  169. {
  170. ActorTask actorTask = await this.GetAsync();
  171. if (this.Id == 0)
  172. {
  173. return;
  174. }
  175. if (actorTask == null)
  176. {
  177. return;
  178. }
  179. try
  180. {
  181. this.RunTask(actorTask);
  182. }
  183. catch (Exception e)
  184. {
  185. Log.Error(e.ToString());
  186. return;
  187. }
  188. }
  189. }
  190. private async void RunTask(ActorTask task)
  191. {
  192. try
  193. {
  194. IResponse response = await task.Run();
  195. // 如果没找到Actor,发送窗口减少为1,重试
  196. if (response.Error == ErrorCode.ERR_NotFoundActor)
  197. {
  198. this.CancellationTokenSource.Cancel();
  199. this.WindowSize = 1;
  200. ++this.failTimes;
  201. while (this.WaitingTasks.Count > 0)
  202. {
  203. ActorTask actorTask = this.WaitingTasks.Dequeue();
  204. this.RunningTasks.Enqueue(actorTask);
  205. }
  206. ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);
  207. // 失败3次则清空actor发送队列,返回失败
  208. if (this.failTimes > 3)
  209. {
  210. while (this.WaitingTasks.Count > 0)
  211. {
  212. ActorTask actorTask = this.WaitingTasks.Dequeue();
  213. actorTask.RunFail(response.Error);
  214. }
  215. // 失败直接删除actorproxy
  216. Game.Scene.GetComponent<ActorProxyComponent>().Remove(this.Id);
  217. return;
  218. }
  219. // 等待一会再发送
  220. await Game.Scene.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
  221. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  222. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  223. this.CancellationTokenSource = new CancellationTokenSource();
  224. this.AllowGet();
  225. return;
  226. }
  227. // 发送成功
  228. this.LastSendTime = TimeHelper.Now();
  229. this.failTimes = 0;
  230. if (this.WindowSize < MaxWindowSize)
  231. {
  232. ++this.WindowSize;
  233. }
  234. this.Remove();
  235. }
  236. catch (Exception e)
  237. {
  238. Log.Error(e.ToString());
  239. }
  240. }
  241. public void Send(IMessage message)
  242. {
  243. ActorMessageTask task = new ActorMessageTask(this, message);
  244. this.Add(task);
  245. }
  246. public Task<IResponse> Call(IRequest request)
  247. {
  248. ActorRpcTask task = new ActorRpcTask(this, request);
  249. this.Add(task);
  250. return task.Tcs.Task;
  251. }
  252. public async Task<IResponse> RealCall(ActorRequest request, CancellationToken cancellationToken)
  253. {
  254. try
  255. {
  256. //Log.Debug($"realcall {MongoHelper.ToJson(request)} {this.Address}");
  257. request.Id = this.Id;
  258. Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
  259. IResponse response = await session.Call(request, cancellationToken);
  260. return response;
  261. }
  262. catch (RpcException e)
  263. {
  264. Log.Error($"{this.Address} {e}");
  265. throw;
  266. }
  267. }
  268. public string DebugQueue(Queue<ActorTask> tasks)
  269. {
  270. string s = "";
  271. foreach (ActorTask task in tasks)
  272. {
  273. s += $" {task.message.GetType().Name}";
  274. }
  275. return s;
  276. }
  277. }
  278. }