Session.cs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Net;
  6. namespace ET
  7. {
  8. public readonly struct RpcInfo
  9. {
  10. public readonly IRequest Request;
  11. public readonly ETTask<IResponse> Tcs;
  12. public RpcInfo(IRequest request)
  13. {
  14. this.Request = request;
  15. this.Tcs = ETTask<IResponse>.Create(true);
  16. }
  17. }
  18. [FriendOf(typeof(Session))]
  19. public static class SessionSystem
  20. {
  21. [ObjectSystem]
  22. public class SessionAwakeSystem: AwakeSystem<Session, AService>
  23. {
  24. public override void Awake(Session self, AService aService)
  25. {
  26. self.AService = aService;
  27. long timeNow = TimeHelper.ClientNow();
  28. self.LastRecvTime = timeNow;
  29. self.LastSendTime = timeNow;
  30. self.requestCallbacks.Clear();
  31. Log.Info($"session create: zone: {self.DomainZone()} id: {self.Id} {timeNow} ");
  32. }
  33. }
  34. [ObjectSystem]
  35. public class SessionDestroySystem: DestroySystem<Session>
  36. {
  37. public override void Destroy(Session self)
  38. {
  39. self.AService.RemoveChannel(self.Id);
  40. foreach (RpcInfo responseCallback in self.requestCallbacks.Values.ToArray())
  41. {
  42. responseCallback.Tcs.SetException(new RpcException(self.Error, $"session dispose: {self.Id} {self.RemoteAddress}"));
  43. }
  44. Log.Info($"session dispose: {self.RemoteAddress} id: {self.Id} ErrorCode: {self.Error}, please see ErrorCode.cs! {TimeHelper.ClientNow()}");
  45. self.requestCallbacks.Clear();
  46. }
  47. }
  48. public static void OnRead(this Session self, ushort opcode, IResponse response)
  49. {
  50. OpcodeHelper.LogMsg(self.DomainZone(), opcode, response);
  51. if (!self.requestCallbacks.TryGetValue(response.RpcId, out var action))
  52. {
  53. return;
  54. }
  55. self.requestCallbacks.Remove(response.RpcId);
  56. if (ErrorCore.IsRpcNeedThrowException(response.Error))
  57. {
  58. action.Tcs.SetException(new Exception($"Rpc error, request: {action.Request} response: {response}"));
  59. return;
  60. }
  61. action.Tcs.SetResult(response);
  62. }
  63. public static async ETTask<IResponse> Call(this Session self, IRequest request, ETCancellationToken cancellationToken)
  64. {
  65. int rpcId = ++Session.RpcId;
  66. RpcInfo rpcInfo = new RpcInfo(request);
  67. self.requestCallbacks[rpcId] = rpcInfo;
  68. request.RpcId = rpcId;
  69. self.Send(request);
  70. void CancelAction()
  71. {
  72. if (!self.requestCallbacks.TryGetValue(rpcId, out RpcInfo action))
  73. {
  74. return;
  75. }
  76. self.requestCallbacks.Remove(rpcId);
  77. Type responseType = OpcodeTypeComponent.Instance.GetResponseType(action.Request.GetType());
  78. IResponse response = (IResponse) Activator.CreateInstance(responseType);
  79. response.Error = ErrorCore.ERR_Cancel;
  80. action.Tcs.SetResult(response);
  81. }
  82. IResponse ret;
  83. try
  84. {
  85. cancellationToken?.Add(CancelAction);
  86. ret = await rpcInfo.Tcs;
  87. }
  88. finally
  89. {
  90. cancellationToken?.Remove(CancelAction);
  91. }
  92. return ret;
  93. }
  94. public static async ETTask<IResponse> Call(this Session self, IRequest request)
  95. {
  96. int rpcId = ++Session.RpcId;
  97. RpcInfo rpcInfo = new RpcInfo(request);
  98. self.requestCallbacks[rpcId] = rpcInfo;
  99. request.RpcId = rpcId;
  100. self.Send(request);
  101. return await rpcInfo.Tcs;
  102. }
  103. public static void Reply(this Session self, IResponse message)
  104. {
  105. self.Send(0, message);
  106. }
  107. public static void Send(this Session self, IMessage message)
  108. {
  109. self.Send(0, message);
  110. }
  111. public static void Send(this Session self, long actorId, IMessage message)
  112. {
  113. (ushort opcode, MemoryStream stream) = MessageSerializeHelper.MessageToStream(message);
  114. OpcodeHelper.LogMsg(self.DomainZone(), opcode, message);
  115. self.Send(actorId, stream);
  116. }
  117. public static void Send(this Session self, long actorId, MemoryStream memoryStream)
  118. {
  119. self.LastSendTime = TimeHelper.ClientNow();
  120. self.AService.SendStream(self.Id, actorId, memoryStream);
  121. }
  122. }
  123. public sealed class Session: Entity, IAwake<AService>, IDestroy
  124. {
  125. public AService AService { get; set; }
  126. public static int RpcId
  127. {
  128. get;
  129. set;
  130. }
  131. public readonly Dictionary<int, RpcInfo> requestCallbacks = new Dictionary<int, RpcInfo>();
  132. public long LastRecvTime
  133. {
  134. get;
  135. set;
  136. }
  137. public long LastSendTime
  138. {
  139. get;
  140. set;
  141. }
  142. public int Error
  143. {
  144. get;
  145. set;
  146. }
  147. public IPEndPoint RemoteAddress
  148. {
  149. get;
  150. set;
  151. }
  152. }
  153. }