Session.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using ETModel;
  7. namespace ETHotfix
  8. {
  9. [ObjectSystem]
  10. public class SessionAwakeSystem : AwakeSystem<Session, ETModel.Session>
  11. {
  12. public override void Awake(Session self, ETModel.Session session)
  13. {
  14. self.session = session;
  15. SessionCallbackComponent sessionComponent = self.session.AddComponent<SessionCallbackComponent>();
  16. sessionComponent.MessageCallback = (s, p) => { self.Run(s, p); };
  17. sessionComponent.DisposeCallback = s => { self.Dispose(); };
  18. }
  19. }
  20. /// <summary>
  21. /// 用来收发热更层的消息
  22. /// </summary>
  23. public class Session: Entity
  24. {
  25. public ETModel.Session session;
  26. private static int RpcId { get; set; }
  27. private readonly Dictionary<int, Action<IResponse>> requestCallback = new Dictionary<int, Action<IResponse>>();
  28. public override void Dispose()
  29. {
  30. if (this.IsDisposed)
  31. {
  32. return;
  33. }
  34. base.Dispose();
  35. foreach (Action<IResponse> action in this.requestCallback.Values.ToArray())
  36. {
  37. action.Invoke(new ResponseMessage { Error = ErrorCode.ERR_SessionDispose });
  38. }
  39. this.requestCallback.Clear();
  40. this.session.Dispose();
  41. }
  42. public void Run(ETModel.Session s, Packet packet)
  43. {
  44. ushort opcode = packet.Opcode;
  45. byte flag = packet.Flag;
  46. OpcodeTypeComponent opcodeTypeComponent = Game.Scene.GetComponent<OpcodeTypeComponent>();
  47. Type responseType = opcodeTypeComponent.GetType(opcode);
  48. object message = ProtobufHelper.FromBytes(responseType, packet.Bytes, packet.Offset, packet.Length);
  49. if ((flag & 0x01) > 0)
  50. {
  51. IResponse response = message as IResponse;
  52. if (response == null)
  53. {
  54. throw new Exception($"flag is response, but hotfix message is not! {opcode}");
  55. }
  56. Action<IResponse> action;
  57. if (!this.requestCallback.TryGetValue(response.RpcId, out action))
  58. {
  59. return;
  60. }
  61. this.requestCallback.Remove(response.RpcId);
  62. action(response);
  63. return;
  64. }
  65. Game.Scene.GetComponent<MessageDispatherComponent>().Handle(session, new MessageInfo(opcode, message));
  66. }
  67. public void Send(IMessage message)
  68. {
  69. Send(0x00, message);
  70. }
  71. public void Send(byte flag, IMessage message)
  72. {
  73. ushort opcode = Game.Scene.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
  74. byte[] bytes = ProtobufHelper.ToBytes(message);
  75. session.Send(flag, opcode, bytes);
  76. }
  77. public void Send(byte flag, ushort opcode, byte[] bytes)
  78. {
  79. session.Send(flag, opcode, bytes);
  80. }
  81. public Task<IResponse> Call(IRequest request)
  82. {
  83. int rpcId = ++RpcId;
  84. var tcs = new TaskCompletionSource<IResponse>();
  85. this.requestCallback[rpcId] = (response) =>
  86. {
  87. try
  88. {
  89. if (response.Error > ErrorCode.ERR_Exception)
  90. {
  91. throw new RpcException(response.Error, response.Message);
  92. }
  93. tcs.SetResult(response);
  94. }
  95. catch (Exception e)
  96. {
  97. tcs.SetException(new Exception($"Rpc Error: {request.GetType().FullName}", e));
  98. }
  99. };
  100. request.RpcId = rpcId;
  101. this.Send(0x00, request);
  102. return tcs.Task;
  103. }
  104. public Task<IResponse> Call(IRequest request, CancellationToken cancellationToken)
  105. {
  106. int rpcId = ++RpcId;
  107. var tcs = new TaskCompletionSource<IResponse>();
  108. this.requestCallback[rpcId] = (response) =>
  109. {
  110. try
  111. {
  112. if (response.Error > ErrorCode.ERR_Exception)
  113. {
  114. throw new RpcException(response.Error, response.Message);
  115. }
  116. tcs.SetResult(response);
  117. }
  118. catch (Exception e)
  119. {
  120. tcs.SetException(new Exception($"Rpc Error: {request.GetType().FullName}", e));
  121. }
  122. };
  123. cancellationToken.Register(() => { this.requestCallback.Remove(rpcId); });
  124. request.RpcId = rpcId;
  125. this.Send(0x00, request);
  126. return tcs.Task;
  127. }
  128. }
  129. }