ActorProxySystem.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using ETModel;
  6. namespace ETHotfix
  7. {
  8. [ObjectSystem]
  9. public class ActorProxyAwakeSystem : AwakeSystem<ActorProxy>
  10. {
  11. public override void Awake(ActorProxy self)
  12. {
  13. self.Awake();
  14. }
  15. }
  16. [ObjectSystem]
  17. public class ActorProxyStartSystem : StartSystem<ActorProxy>
  18. {
  19. public override async void Start(ActorProxy self)
  20. {
  21. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(self.Id);
  22. self.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  23. self.UpdateAsync();
  24. }
  25. }
  26. [ObjectSystem]
  27. public class ActorProxyDestroySystem : DestroySystem<ActorProxy>
  28. {
  29. public override void Destroy(ActorProxy self)
  30. {
  31. self.LastSendTime = 0;
  32. self.Address = null;
  33. while (self.WaitingTasks.Count > 0)
  34. {
  35. ActorTask actorTask = self.WaitingTasks.Dequeue();
  36. actorTask.RunFail(ErrorCode.ERR_NotFoundActor);
  37. }
  38. self.failTimes = 0;
  39. var t = self.tcs;
  40. self.tcs = null;
  41. t?.SetResult(new ActorTask());
  42. }
  43. }
  44. public static class ActorProxyEx
  45. {
  46. public static void Awake(this ActorProxy self)
  47. {
  48. self.LastSendTime = TimeHelper.Now();
  49. self.tcs = null;
  50. self.failTimes = 0;
  51. self.CancellationTokenSource = new CancellationTokenSource();
  52. }
  53. private static void Add(this ActorProxy self, ActorTask task)
  54. {
  55. if (self.IsDisposed)
  56. {
  57. throw new Exception("ActorProxy Disposed! dont hold actorproxy");
  58. }
  59. self.WaitingTasks.Enqueue(task);
  60. // failtimes > 0表示正在重试,这时候不能加到正在发送队列
  61. if (self.failTimes == 0)
  62. {
  63. self.AllowGet();
  64. }
  65. }
  66. private static void AllowGet(this ActorProxy self)
  67. {
  68. if (self.tcs == null || self.WaitingTasks.Count <= 0)
  69. {
  70. return;
  71. }
  72. ActorTask task = self.WaitingTasks.Peek();
  73. var t = self.tcs;
  74. self.tcs = null;
  75. t.SetResult(task);
  76. }
  77. private static Task<ActorTask> GetAsync(this ActorProxy self)
  78. {
  79. if (self.WaitingTasks.Count > 0)
  80. {
  81. ActorTask task = self.WaitingTasks.Peek();
  82. return Task.FromResult(task);
  83. }
  84. self.tcs = new TaskCompletionSource<ActorTask>();
  85. return self.tcs.Task;
  86. }
  87. public static async void UpdateAsync(this ActorProxy self)
  88. {
  89. while (true)
  90. {
  91. ActorTask actorTask = await self.GetAsync();
  92. if (self.IsDisposed)
  93. {
  94. return;
  95. }
  96. try
  97. {
  98. await self.RunTask(actorTask);
  99. }
  100. catch (Exception e)
  101. {
  102. Log.Error(e);
  103. return;
  104. }
  105. }
  106. }
  107. private static async Task RunTask(this ActorProxy self, ActorTask task)
  108. {
  109. try
  110. {
  111. IResponse response = await task.Run();
  112. // 如果没找到Actor,重试
  113. if (response.Error == ErrorCode.ERR_NotFoundActor)
  114. {
  115. self.CancellationTokenSource.Cancel();
  116. ++self.failTimes;
  117. // 失败10次则清空actor发送队列,返回失败
  118. if (self.failTimes > 10)
  119. {
  120. // 失败直接删除actorproxy
  121. Log.Info($"actor send message fail, actorid: {self.Id}");
  122. Game.Scene.GetComponent<ActorProxyComponent>().Remove(self.Id);
  123. return;
  124. }
  125. // 等待1s再发送
  126. await Game.Scene.GetComponent<TimerComponent>().WaitAsync(1000);
  127. int appId = await Game.Scene.GetComponent<LocationProxyComponent>().Get(self.Id);
  128. self.Address = Game.Scene.GetComponent<StartConfigComponent>().Get(appId).GetComponent<InnerConfig>().IPEndPoint;
  129. self.CancellationTokenSource = new CancellationTokenSource();
  130. self.AllowGet();
  131. return;
  132. }
  133. // 发送成功
  134. self.LastSendTime = TimeHelper.Now();
  135. self.failTimes = 0;
  136. self.WaitingTasks.Dequeue();
  137. }
  138. catch (Exception e)
  139. {
  140. Log.Error(e);
  141. }
  142. }
  143. public static void Send(this ActorProxy self, IActorMessage message)
  144. {
  145. ActorTask task = new ActorTask { message = message, proxy = self };
  146. self.Add(task);
  147. }
  148. public static Task<IResponse> Call(this ActorProxy self, IActorRequest request)
  149. {
  150. ActorTask task = new ActorTask { message = request, proxy = self, Tcs = new TaskCompletionSource<IResponse>() };
  151. self.Add(task);
  152. return task.Tcs.Task;
  153. }
  154. public static string DebugQueue(this ActorProxy self, Queue<ActorTask> tasks)
  155. {
  156. string s = "";
  157. foreach (ActorTask task in tasks)
  158. {
  159. s += $" {task.message.GetType().Name}";
  160. }
  161. return s;
  162. }
  163. }
  164. }