ActorProxy.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace ETModel
  7. {
  8. [ObjectSystem]
  9. public class ActorProxyAwakeSystem : AwakeSystem<ActorProxy>
  10. {
  11. public override void Awake(ActorProxy self)
  12. {
  13. self.Awake();
  14. }
  15. }
  16. [ObjectSystem]
  17. public class ActorProxyStartSystem : StartSystem<ActorProxy>
  18. {
  19. public override async void Start(ActorProxy self)
  20. {
  21. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(self.Id);
  22. self.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  23. self.UpdateAsync();
  24. }
  25. }
  26. public sealed class ActorProxy : Component
  27. {
  28. // actor的地址
  29. public IPEndPoint Address;
  30. // 还没发送的消息
  31. public Queue<ActorTask> WaitingTasks = new Queue<ActorTask>();
  32. // 最近发送消息的时间
  33. public long LastSendTime;
  34. private TaskCompletionSource<ActorTask> tcs;
  35. public CancellationTokenSource CancellationTokenSource;
  36. private int failTimes;
  37. public void Awake()
  38. {
  39. this.LastSendTime = TimeHelper.Now();
  40. this.tcs = null;
  41. this.CancellationTokenSource = new CancellationTokenSource();
  42. }
  43. public override void Dispose()
  44. {
  45. if (this.IsDisposed)
  46. {
  47. return;
  48. }
  49. base.Dispose();
  50. this.LastSendTime = 0;
  51. this.Address = null;
  52. while (this.WaitingTasks.Count > 0)
  53. {
  54. ActorTask actorTask = this.WaitingTasks.Dequeue();
  55. actorTask.RunFail(ErrorCode.ERR_NotFoundActor);
  56. }
  57. this.failTimes = 0;
  58. var t = this.tcs;
  59. this.tcs = null;
  60. t?.SetResult(new ActorTask());
  61. }
  62. private void Add(ActorTask task)
  63. {
  64. if (this.IsDisposed)
  65. {
  66. throw new Exception("ActorProxy Disposed! dont hold actorproxy");
  67. }
  68. this.WaitingTasks.Enqueue(task);
  69. // failtimes > 0表示正在重试,这时候不能加到正在发送队列
  70. if (this.failTimes == 0)
  71. {
  72. this.AllowGet();
  73. }
  74. }
  75. private void AllowGet()
  76. {
  77. if (this.tcs == null || this.WaitingTasks.Count <= 0)
  78. {
  79. return;
  80. }
  81. ActorTask task = this.WaitingTasks.Peek();
  82. var t = this.tcs;
  83. this.tcs = null;
  84. t.SetResult(task);
  85. }
  86. private Task<ActorTask> GetAsync()
  87. {
  88. if (this.WaitingTasks.Count > 0)
  89. {
  90. ActorTask task = this.WaitingTasks.Peek();
  91. return Task.FromResult(task);
  92. }
  93. this.tcs = new TaskCompletionSource<ActorTask>();
  94. return this.tcs.Task;
  95. }
  96. public async void UpdateAsync()
  97. {
  98. while (true)
  99. {
  100. ActorTask actorTask = await this.GetAsync();
  101. if (this.IsDisposed)
  102. {
  103. return;
  104. }
  105. try
  106. {
  107. await this.RunTask(actorTask);
  108. }
  109. catch (Exception e)
  110. {
  111. Log.Error(e);
  112. return;
  113. }
  114. }
  115. }
  116. private async Task RunTask(ActorTask task)
  117. {
  118. try
  119. {
  120. IResponse response = await task.Run();
  121. // 如果没找到Actor,重试
  122. if (response.Error == ErrorCode.ERR_NotFoundActor)
  123. {
  124. this.CancellationTokenSource.Cancel();
  125. ++this.failTimes;
  126. // 失败10次则清空actor发送队列,返回失败
  127. if (this.failTimes > 10)
  128. {
  129. // 失败直接删除actorproxy
  130. Log.Info($"actor send message fail, actorid: {this.Id}");
  131. Game.Scene.GetComponent<ActorProxyComponent>().Remove(this.Id);
  132. return;
  133. }
  134. // 等待1s再发送
  135. await Game.Scene.GetComponent<TimerComponent>().WaitAsync(1000);
  136. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(this.Id);
  137. this.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  138. this.CancellationTokenSource = new CancellationTokenSource();
  139. this.AllowGet();
  140. return;
  141. }
  142. // 发送成功
  143. this.LastSendTime = TimeHelper.Now();
  144. this.failTimes = 0;
  145. this.WaitingTasks.Dequeue();
  146. }
  147. catch (Exception e)
  148. {
  149. Log.Error(e);
  150. }
  151. }
  152. public void Send(IActorMessage message)
  153. {
  154. ActorTask task = new ActorTask
  155. {
  156. message = message,
  157. proxy = this
  158. };
  159. this.Add(task);
  160. }
  161. public Task<IResponse> Call(IActorRequest request)
  162. {
  163. ActorTask task = new ActorTask
  164. {
  165. message = request,
  166. proxy = this,
  167. Tcs = new TaskCompletionSource<IResponse>()
  168. };
  169. this.Add(task);
  170. return task.Tcs.Task;
  171. }
  172. public string DebugQueue(Queue<ActorTask> tasks)
  173. {
  174. string s = "";
  175. foreach (ActorTask task in tasks)
  176. {
  177. s += $" {task.message.GetType().Name}";
  178. }
  179. return s;
  180. }
  181. }
  182. }