NetworkComponent.cs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Runtime.Serialization;
  5. using System.Runtime.Serialization.Formatters.Binary;
  6. using System.Threading.Tasks;
  7. using Common.Base;
  8. using Common.Helper;
  9. using Common.Network;
  10. using TNet;
  11. using UNet;
  12. namespace Model
  13. {
  14. public enum RpcResponseStatus
  15. {
  16. Succee,
  17. Timeout,
  18. Exception,
  19. }
  20. public class RpcExcetionInfo
  21. {
  22. public int ErrorCode { get; private set; }
  23. public string ErrorInfo { get; private set; }
  24. public RpcExcetionInfo(int errorCode, string errorInfo)
  25. {
  26. this.ErrorCode = errorCode;
  27. this.ErrorInfo = errorInfo;
  28. }
  29. }
  30. public class NetworkComponent: Component<World>, IUpdate, IStart
  31. {
  32. private IService service;
  33. private int requestId;
  34. private readonly Dictionary<int, Action<byte[], RpcResponseStatus>> requestCallback =
  35. new Dictionary<int, Action<byte[], RpcResponseStatus>>();
  36. private void Accept(string host, int port, NetworkProtocol protocol = NetworkProtocol.TCP)
  37. {
  38. switch (protocol)
  39. {
  40. case NetworkProtocol.TCP:
  41. this.service = new TService(host, port);
  42. break;
  43. case NetworkProtocol.UDP:
  44. this.service = new UService(host, port);
  45. break;
  46. default:
  47. throw new ArgumentOutOfRangeException("protocol");
  48. }
  49. this.AcceptChannel();
  50. }
  51. public void Start()
  52. {
  53. this.Accept(World.Instance.Options.Host, World.Instance.Options.Port,
  54. World.Instance.Options.Protocol);
  55. }
  56. public void Update()
  57. {
  58. this.service.Update();
  59. }
  60. /// <summary>
  61. /// 接收连接
  62. /// </summary>
  63. private async void AcceptChannel()
  64. {
  65. while (true)
  66. {
  67. AChannel channel = await this.service.GetChannel();
  68. this.ProcessChannel(channel);
  69. }
  70. }
  71. /// <summary>
  72. /// 接收分发封包
  73. /// </summary>
  74. /// <param name="channel"></param>
  75. private async void ProcessChannel(AChannel channel)
  76. {
  77. while (true)
  78. {
  79. byte[] message = await channel.RecvAsync();
  80. Env env = new Env();
  81. env[EnvKey.Channel] = channel;
  82. env[EnvKey.Message] = message;
  83. ushort opcode = BitConverter.ToUInt16(message, 0);
  84. env[EnvKey.Opcode] = opcode;
  85. // 表示消息是rpc响应消息
  86. if (opcode == Opcode.RpcResponse)
  87. {
  88. int id = BitConverter.ToInt32(message, 2);
  89. this.RequestCallback(channel, id, message, RpcResponseStatus.Succee);
  90. continue;
  91. }
  92. // rpc异常
  93. if (opcode == Opcode.RpcException)
  94. {
  95. int id = BitConverter.ToInt32(message, 2);
  96. this.RequestCallback(channel, id, message, RpcResponseStatus.Exception);
  97. continue;
  98. }
  99. // 如果是server message(发给client的消息),说明这是gate server,需要根据unitid查到channel,进行发送
  100. if (MessageTypeHelper.IsServerMessage(opcode))
  101. {
  102. #pragma warning disable 4014
  103. World.Instance.GetComponent<EventComponent<EventAttribute>>()
  104. .RunAsync(EventType.GateRecvServerMessage, env);
  105. #pragma warning restore 4014
  106. continue;
  107. }
  108. // 进行消息分发
  109. if (MessageTypeHelper.IsClientMessage(opcode))
  110. {
  111. #pragma warning disable 4014
  112. World.Instance.GetComponent<EventComponent<EventAttribute>>()
  113. .RunAsync(EventType.LogicRecvClientMessage, env);
  114. #pragma warning restore 4014
  115. continue;
  116. }
  117. if (MessageTypeHelper.IsRpcRequestMessage(opcode))
  118. {
  119. #pragma warning disable 4014
  120. World.Instance.GetComponent<EventComponent<EventAttribute>>()
  121. .RunAsync(EventType.LogicRecvRequestMessage, env);
  122. #pragma warning restore 4014
  123. }
  124. }
  125. }
  126. public void SendAsync(string address, byte[] buffer)
  127. {
  128. AChannel channel = this.service.GetChannel(address);
  129. channel.SendAsync(buffer);
  130. }
  131. public void SendAsync(string address, List<byte[]> buffers)
  132. {
  133. AChannel channel = this.service.GetChannel(address);
  134. channel.SendAsync(buffers);
  135. }
  136. // 消息回调或者超时回调
  137. public void RequestCallback(AChannel channel, int id, byte[] buffer, RpcResponseStatus responseStatus)
  138. {
  139. Action<byte[], RpcResponseStatus> action;
  140. if (!this.requestCallback.TryGetValue(id, out action))
  141. {
  142. return;
  143. }
  144. this.requestCallback.Remove(id);
  145. action(buffer, responseStatus);
  146. }
  147. /// <summary>
  148. /// Rpc请求
  149. /// </summary>
  150. public Task<T> RpcRequest<T, K>(string address, short type, K request, int waitTime = 0)
  151. {
  152. AChannel channel = this.service.GetChannel(address);
  153. ++this.requestId;
  154. byte[] requestBuffer = MongoHelper.ToBson(request);
  155. byte[] typeBuffer = BitConverter.GetBytes(type);
  156. byte[] idBuffer = BitConverter.GetBytes(this.requestId);
  157. channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, requestBuffer });
  158. var tcs = new TaskCompletionSource<T>();
  159. this.requestCallback[this.requestId] = (messageBytes, status) =>
  160. {
  161. if (status == RpcResponseStatus.Timeout)
  162. {
  163. tcs.SetException(new Exception(
  164. string.Format("rpc timeout {0} {1}", type, MongoHelper.ToJson(request))));
  165. return;
  166. }
  167. if (status == RpcResponseStatus.Exception)
  168. {
  169. BinaryFormatter formatter = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.All));
  170. Exception exception;
  171. using (MemoryStream stream = new MemoryStream(messageBytes))
  172. {
  173. stream.Seek(6, SeekOrigin.Begin);
  174. exception = (Exception)formatter.Deserialize(stream);
  175. }
  176. tcs.SetException(exception);
  177. return;
  178. }
  179. // RpcResponseStatus.Succee
  180. T response = MongoHelper.FromBson<T>(messageBytes, 6);
  181. tcs.SetResult(response);
  182. };
  183. if (waitTime > 0)
  184. {
  185. this.service.Timer.Add(TimeHelper.Now() + waitTime,
  186. () => { this.RequestCallback(channel, this.requestId, null, RpcResponseStatus.Timeout); });
  187. }
  188. return tcs.Task;
  189. }
  190. /// <summary>
  191. /// Rpc响应
  192. /// </summary>
  193. public void RpcResponse<T>(AChannel channel, int id, T response)
  194. {
  195. byte[] responseBuffer = MongoHelper.ToBson(response);
  196. byte[] typeBuffer = BitConverter.GetBytes(Opcode.RpcResponse);
  197. byte[] idBuffer = BitConverter.GetBytes(id);
  198. channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
  199. }
  200. /// <summary>
  201. /// Rpc响应
  202. /// </summary>
  203. public void RpcException(AChannel channel, int id, Exception e)
  204. {
  205. byte[] opcodeBuffer = BitConverter.GetBytes(Opcode.RpcException);
  206. byte[] idBuffer = BitConverter.GetBytes(id);
  207. BinaryFormatter formatter = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.All));
  208. using (MemoryStream stream = new MemoryStream())
  209. {
  210. formatter.Serialize(stream, e);
  211. channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, stream.ToArray() });
  212. }
  213. }
  214. }
  215. }