Session.cs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Net;
  6. namespace ET
  7. {
  8. [ObjectSystem]
  9. public class SessionAwakeSystem: AwakeSystem<Session, AService>
  10. {
  11. public override void Awake(Session self, AService aService)
  12. {
  13. self.Awake(aService);
  14. }
  15. }
  16. public sealed class Session: Entity, IAwake<AService>
  17. {
  18. private readonly struct RpcInfo
  19. {
  20. public readonly IRequest Request;
  21. public readonly ETTask<IResponse> Tcs;
  22. public RpcInfo(IRequest request)
  23. {
  24. this.Request = request;
  25. this.Tcs = ETTask<IResponse>.Create(true);
  26. }
  27. }
  28. public AService AService;
  29. private static int RpcId
  30. {
  31. get;
  32. set;
  33. }
  34. private readonly Dictionary<int, RpcInfo> requestCallbacks = new Dictionary<int, RpcInfo>();
  35. public long LastRecvTime
  36. {
  37. get;
  38. set;
  39. }
  40. public long LastSendTime
  41. {
  42. get;
  43. set;
  44. }
  45. public int Error
  46. {
  47. get;
  48. set;
  49. }
  50. public void Awake(AService aService)
  51. {
  52. this.AService = aService;
  53. long timeNow = TimeHelper.ClientNow();
  54. this.LastRecvTime = timeNow;
  55. this.LastSendTime = timeNow;
  56. this.requestCallbacks.Clear();
  57. Log.Info($"session create: zone: {this.DomainZone()} id: {this.Id} {timeNow} ");
  58. }
  59. public override void Dispose()
  60. {
  61. if (this.IsDisposed)
  62. {
  63. return;
  64. }
  65. int zone = this.DomainZone();
  66. long id = this.Id;
  67. base.Dispose();
  68. this.AService.RemoveChannel(this.Id);
  69. foreach (RpcInfo responseCallback in this.requestCallbacks.Values.ToArray())
  70. {
  71. responseCallback.Tcs.SetException(new RpcException(this.Error, $"session dispose: {id} {this.RemoteAddress}"));
  72. }
  73. Log.Info($"session dispose: {this.RemoteAddress} zone: {zone} id: {id} ErrorCode: {this.Error}, please see ErrorCode.cs! {TimeHelper.ClientNow()}");
  74. this.requestCallbacks.Clear();
  75. }
  76. public IPEndPoint RemoteAddress
  77. {
  78. get;
  79. set;
  80. }
  81. public void OnRead(ushort opcode, IResponse response)
  82. {
  83. OpcodeHelper.LogMsg(this.DomainZone(), opcode, response);
  84. if (!this.requestCallbacks.TryGetValue(response.RpcId, out var action))
  85. {
  86. return;
  87. }
  88. this.requestCallbacks.Remove(response.RpcId);
  89. if (ErrorCore.IsRpcNeedThrowException(response.Error))
  90. {
  91. action.Tcs.SetException(new Exception($"Rpc error, request: {action.Request} response: {response}"));
  92. return;
  93. }
  94. action.Tcs.SetResult(response);
  95. }
  96. public async ETTask<IResponse> Call(IRequest request, ETCancellationToken cancellationToken)
  97. {
  98. int rpcId = ++RpcId;
  99. RpcInfo rpcInfo = new RpcInfo(request);
  100. this.requestCallbacks[rpcId] = rpcInfo;
  101. request.RpcId = rpcId;
  102. this.Send(request);
  103. void CancelAction()
  104. {
  105. if (!this.requestCallbacks.TryGetValue(rpcId, out RpcInfo action))
  106. {
  107. return;
  108. }
  109. this.requestCallbacks.Remove(rpcId);
  110. Type responseType = OpcodeTypeComponent.Instance.GetResponseType(action.Request.GetType());
  111. IResponse response = (IResponse) Activator.CreateInstance(responseType);
  112. response.Error = ErrorCore.ERR_Cancel;
  113. action.Tcs.SetResult(response);
  114. }
  115. IResponse ret;
  116. try
  117. {
  118. cancellationToken?.Add(CancelAction);
  119. ret = await rpcInfo.Tcs;
  120. }
  121. finally
  122. {
  123. cancellationToken?.Remove(CancelAction);
  124. }
  125. return ret;
  126. }
  127. public async ETTask<IResponse> Call(IRequest request)
  128. {
  129. int rpcId = ++RpcId;
  130. RpcInfo rpcInfo = new RpcInfo(request);
  131. this.requestCallbacks[rpcId] = rpcInfo;
  132. request.RpcId = rpcId;
  133. this.Send(request);
  134. return await rpcInfo.Tcs;
  135. }
  136. public void Reply(IResponse message)
  137. {
  138. this.Send(0, message);
  139. }
  140. public void Send(IMessage message)
  141. {
  142. this.Send(0, message);
  143. }
  144. public void Send(long actorId, IMessage message)
  145. {
  146. (ushort opcode, MemoryStream stream) = MessageSerializeHelper.MessageToStream(message);
  147. OpcodeHelper.LogMsg(this.DomainZone(), opcode, message);
  148. this.Send(actorId, stream);
  149. }
  150. public void Send(long actorId, MemoryStream memoryStream)
  151. {
  152. this.LastSendTime = TimeHelper.ClientNow();
  153. this.AService.SendStream(this.Id, actorId, memoryStream);
  154. }
  155. }
  156. }