Session.cs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Net;
  6. namespace ET
  7. {
  8. [ObjectSystem]
  9. public class SessionAwakeSystem : AwakeSystem<Session, AService>
  10. {
  11. public override void Awake(Session self, AService aService)
  12. {
  13. self.Awake(aService);
  14. }
  15. }
  16. public sealed class Session : Entity, IAwake<AService>
  17. {
  18. private readonly struct RpcInfo
  19. {
  20. public readonly IRequest Request;
  21. public readonly ETTask<IResponse> Tcs;
  22. public RpcInfo(IRequest request)
  23. {
  24. this.Request = request;
  25. this.Tcs = ETTask<IResponse>.Create(true);
  26. }
  27. }
  28. public AService AService;
  29. private static int RpcId { get; set; }
  30. private readonly Dictionary<int, RpcInfo> requestCallbacks = new Dictionary<int, RpcInfo>();
  31. public long LastRecvTime { get; set; }
  32. public long LastSendTime { get; set; }
  33. public int Error { get; set; }
  34. public void Awake(AService aService)
  35. {
  36. this.AService = aService;
  37. long timeNow = TimeHelper.ClientNow();
  38. this.LastRecvTime = timeNow;
  39. this.LastSendTime = timeNow;
  40. this.requestCallbacks.Clear();
  41. Log.Info($"session create: zone: {this.DomainZone()} id: {this.Id} {timeNow} ");
  42. }
  43. public override void Dispose()
  44. {
  45. if (this.IsDisposed)
  46. {
  47. return;
  48. }
  49. int zone = this.DomainZone();
  50. long id = this.Id;
  51. base.Dispose();
  52. this.AService.Remove(this.Id);
  53. foreach (RpcInfo responseCallback in this.requestCallbacks.Values.ToArray())
  54. {
  55. responseCallback.Tcs.SetException(new RpcException(this.Error,
  56. $"session dispose: {id} {this.RemoteAddress}"));
  57. }
  58. Log.Info(
  59. $"session dispose: {this.RemoteAddress} zone: {zone} id: {id} ErrorCode: {this.Error}, please see ErrorCode.cs! {TimeHelper.ClientNow()}");
  60. this.requestCallbacks.Clear();
  61. }
  62. public IPEndPoint RemoteAddress { get; set; }
  63. public void OnRead(ushort opcode, IResponse response)
  64. {
  65. OpcodeHelper.LogMsg(this.DomainZone(), opcode, response);
  66. if (!this.requestCallbacks.TryGetValue(response.RpcId, out var action))
  67. {
  68. return;
  69. }
  70. this.requestCallbacks.Remove(response.RpcId);
  71. if (ErrorCore.IsRpcNeedThrowException(response.Error))
  72. {
  73. action.Tcs.SetException(new Exception($"Rpc error, request: {action.Request} response: {response}"));
  74. return;
  75. }
  76. action.Tcs.SetResult(response);
  77. }
  78. public async ETTask<IResponse> Call(IRequest request, ETCancellationToken cancellationToken)
  79. {
  80. int rpcId = ++RpcId;
  81. RpcInfo rpcInfo = new RpcInfo(request);
  82. this.requestCallbacks[rpcId] = rpcInfo;
  83. request.RpcId = rpcId;
  84. this.Send(request);
  85. void CancelAction()
  86. {
  87. if (!this.requestCallbacks.TryGetValue(rpcId, out RpcInfo action))
  88. {
  89. return;
  90. }
  91. this.requestCallbacks.Remove(rpcId);
  92. Type responseType = OpcodeTypeComponent.Instance.GetResponseType(action.Request.GetType());
  93. IResponse response = (IResponse)Activator.CreateInstance(responseType);
  94. response.Error = ErrorCore.ERR_Cancel;
  95. action.Tcs.SetResult(response);
  96. }
  97. IResponse ret;
  98. try
  99. {
  100. cancellationToken?.Add(CancelAction);
  101. ret = await rpcInfo.Tcs;
  102. }
  103. finally
  104. {
  105. cancellationToken?.Remove(CancelAction);
  106. }
  107. return ret;
  108. }
  109. public async ETTask<IResponse> Call(IRequest request)
  110. {
  111. int rpcId = ++RpcId;
  112. RpcInfo rpcInfo = new RpcInfo(request);
  113. this.requestCallbacks[rpcId] = rpcInfo;
  114. request.RpcId = rpcId;
  115. this.Send(request);
  116. return await rpcInfo.Tcs;
  117. }
  118. public void Reply(IResponse message)
  119. {
  120. this.Send(0, message);
  121. }
  122. public void Send(IMessage message)
  123. {
  124. this.Send(0, message);
  125. }
  126. public void Send(long actorId, IMessage message)
  127. {
  128. (ushort opcode, MemoryStream stream) = MessageSerializeHelper.MessageToStream(message);
  129. OpcodeHelper.LogMsg(this.DomainZone(), opcode, message);
  130. this.Send(actorId, stream);
  131. }
  132. public void Send(MemoryStream memoryStream)
  133. {
  134. this.LastSendTime = TimeHelper.ClientNow();
  135. this.AService.Send(this.Id, 0, memoryStream);
  136. }
  137. public void Send(long actorId, MemoryStream memoryStream)
  138. {
  139. this.LastSendTime = TimeHelper.ClientNow();
  140. this.AService.Send(this.Id, actorId, memoryStream);
  141. }
  142. }
  143. }