NetworkComponent.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using Common.Base;
  5. using Common.Helper;
  6. using Common.Network;
  7. using TNet;
  8. using UNet;
  9. namespace Model
  10. {
  11. public enum RpcResponseStatus
  12. {
  13. Succee,
  14. Timeout,
  15. Exception,
  16. }
  17. public class RpcExcetionInfo
  18. {
  19. public int ErrorCode { get; private set; }
  20. public string ErrorInfo { get; private set; }
  21. public RpcExcetionInfo(int errorCode, string errorInfo)
  22. {
  23. this.ErrorCode = errorCode;
  24. this.ErrorInfo = errorInfo;
  25. }
  26. }
  27. public class NetworkComponent: Component<World>, IUpdate, IStart
  28. {
  29. private IService service;
  30. private int requestId;
  31. private readonly Dictionary<int, Action<byte[], RpcResponseStatus>> requestCallback =
  32. new Dictionary<int, Action<byte[], RpcResponseStatus>>();
  33. private void Accept(string host, int port, NetworkProtocol protocol = NetworkProtocol.TCP)
  34. {
  35. switch (protocol)
  36. {
  37. case NetworkProtocol.TCP:
  38. this.service = new TService(host, port);
  39. break;
  40. case NetworkProtocol.UDP:
  41. this.service = new UService(host, port);
  42. break;
  43. default:
  44. throw new ArgumentOutOfRangeException("protocol");
  45. }
  46. this.AcceptChannel();
  47. }
  48. public void Start()
  49. {
  50. this.Accept(World.Instance.Options.Host, World.Instance.Options.Port,
  51. World.Instance.Options.Protocol);
  52. }
  53. public void Update()
  54. {
  55. this.service.Update();
  56. }
  57. /// <summary>
  58. /// 接收连接
  59. /// </summary>
  60. private async void AcceptChannel()
  61. {
  62. while (true)
  63. {
  64. AChannel channel = await this.service.GetChannel();
  65. this.ProcessChannel(channel);
  66. }
  67. }
  68. /// <summary>
  69. /// 接收分发封包
  70. /// </summary>
  71. /// <param name="channel"></param>
  72. private async void ProcessChannel(AChannel channel)
  73. {
  74. while (true)
  75. {
  76. byte[] message = await channel.RecvAsync();
  77. Env env = new Env();
  78. env[EnvKey.Channel] = channel;
  79. env[EnvKey.Message] = message;
  80. ushort opcode = BitConverter.ToUInt16(message, 0);
  81. env[EnvKey.Opcode] = opcode;
  82. // 表示消息是rpc响应消息
  83. if (opcode == Opcode.RpcResponse)
  84. {
  85. int id = BitConverter.ToInt32(message, 2);
  86. this.RequestCallback(channel, id, message, RpcResponseStatus.Succee);
  87. continue;
  88. }
  89. // rpc异常
  90. if (opcode == Opcode.RpcException)
  91. {
  92. int id = BitConverter.ToInt32(message, 2);
  93. this.RequestCallback(channel, id, message, RpcResponseStatus.Exception);
  94. continue;
  95. }
  96. // 如果是server message(发给client的消息),说明这是gate server,需要根据unitid查到channel,进行发送
  97. if (MessageTypeHelper.IsServerMessage(opcode))
  98. {
  99. World.Instance.GetComponent<EventComponent<EventAttribute>>()
  100. .RunAsync(EventType.GateRecvServerMessage, env);
  101. continue;
  102. }
  103. // 进行消息分发
  104. if (MessageTypeHelper.IsClientMessage(opcode))
  105. {
  106. World.Instance.GetComponent<EventComponent<EventAttribute>>()
  107. .RunAsync(EventType.LogicRecvClientMessage, env);
  108. continue;
  109. }
  110. if (MessageTypeHelper.IsRpcRequestMessage(opcode))
  111. {
  112. World.Instance.GetComponent<EventComponent<EventAttribute>>()
  113. .RunAsync(EventType.LogicRecvRpcMessage, env);
  114. }
  115. }
  116. }
  117. public void SendAsync(string address, byte[] buffer)
  118. {
  119. AChannel channel = this.service.GetChannel(address);
  120. channel.SendAsync(buffer);
  121. }
  122. public void SendAsync(string address, List<byte[]> buffers)
  123. {
  124. AChannel channel = this.service.GetChannel(address);
  125. channel.SendAsync(buffers);
  126. }
  127. // 消息回调或者超时回调
  128. public void RequestCallback(AChannel channel, int id, byte[] buffer, RpcResponseStatus responseStatus)
  129. {
  130. Action<byte[], RpcResponseStatus> action;
  131. if (!this.requestCallback.TryGetValue(id, out action))
  132. {
  133. return;
  134. }
  135. this.requestCallback.Remove(id);
  136. action(buffer, responseStatus);
  137. }
  138. /// <summary>
  139. /// Rpc请求
  140. /// </summary>
  141. public Task<T> RpcRequest<T, K>(string address, short type, K request, int waitTime = 0)
  142. {
  143. AChannel channel = this.service.GetChannel(address);
  144. ++this.requestId;
  145. byte[] requestBuffer = MongoHelper.ToBson(request);
  146. byte[] typeBuffer = BitConverter.GetBytes(type);
  147. byte[] idBuffer = BitConverter.GetBytes(this.requestId);
  148. channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, requestBuffer });
  149. var tcs = new TaskCompletionSource<T>();
  150. this.requestCallback[this.requestId] = (e, b) =>
  151. {
  152. if (b == RpcResponseStatus.Timeout)
  153. {
  154. tcs.SetException(new Exception(
  155. string.Format("rpc timeout {0} {1}", type, MongoHelper.ToJson(request))));
  156. return;
  157. }
  158. if (b == RpcResponseStatus.Exception)
  159. {
  160. RpcExcetionInfo errorInfo = MongoHelper.FromBson<RpcExcetionInfo>(e, 8);
  161. tcs.SetException(new Exception(
  162. string.Format("rpc exception {0} {1} {2}", type, MongoHelper.ToJson(request), MongoHelper.ToJson(errorInfo))));
  163. return;
  164. }
  165. // RpcResponseStatus.Succee
  166. T response = MongoHelper.FromBson<T>(e, 6);
  167. tcs.SetResult(response);
  168. };
  169. if (waitTime > 0)
  170. {
  171. this.service.Timer.Add(TimeHelper.Now() + waitTime,
  172. () => { this.RequestCallback(channel, this.requestId, null, RpcResponseStatus.Timeout); });
  173. }
  174. return tcs.Task;
  175. }
  176. /// <summary>
  177. /// Rpc响应
  178. /// </summary>
  179. public void RpcResponse<T>(AChannel channel, int id, T response)
  180. {
  181. byte[] responseBuffer = MongoHelper.ToBson(response);
  182. byte[] typeBuffer = BitConverter.GetBytes(Opcode.RpcResponse);
  183. byte[] idBuffer = BitConverter.GetBytes(id);
  184. channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
  185. }
  186. /// <summary>
  187. /// Rpc响应
  188. /// </summary>
  189. public void RpcException(AChannel channel, int id, int errorCode, string errorInfo)
  190. {
  191. byte[] typeBuffer = BitConverter.GetBytes(Opcode.RpcException);
  192. byte[] idBuffer = BitConverter.GetBytes(id);
  193. RpcExcetionInfo info = new RpcExcetionInfo(errorCode, errorInfo);
  194. byte[] responseBuffer = MongoHelper.ToBson(info);
  195. channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
  196. }
  197. }
  198. }