NetworkComponent.cs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Runtime.Serialization;
  5. using System.Runtime.Serialization.Formatters.Binary;
  6. using System.Threading.Tasks;
  7. using Common.Base;
  8. using Common.Helper;
  9. using Common.Network;
  10. using MongoDB.Bson;
  11. using TNet;
  12. using UNet;
  13. namespace Model
  14. {
  15. public enum RpcResponseStatus
  16. {
  17. Succee,
  18. Timeout,
  19. Exception,
  20. }
  21. public class RpcExcetionInfo
  22. {
  23. public int ErrorCode { get; private set; }
  24. public string ErrorInfo { get; private set; }
  25. public RpcExcetionInfo(int errorCode, string errorInfo)
  26. {
  27. this.ErrorCode = errorCode;
  28. this.ErrorInfo = errorInfo;
  29. }
  30. }
  31. public class NetworkComponent: Component<World>, IUpdate, IStart
  32. {
  33. private IService service;
  34. private int requestId;
  35. private readonly Dictionary<int, Action<byte[], RpcResponseStatus>> requestCallback =
  36. new Dictionary<int, Action<byte[], RpcResponseStatus>>();
  37. private void Accept(string host, int port, NetworkProtocol protocol = NetworkProtocol.TCP)
  38. {
  39. switch (protocol)
  40. {
  41. case NetworkProtocol.TCP:
  42. this.service = new TService(host, port);
  43. break;
  44. case NetworkProtocol.UDP:
  45. this.service = new UService(host, port);
  46. break;
  47. default:
  48. throw new ArgumentOutOfRangeException("protocol");
  49. }
  50. this.AcceptChannel();
  51. }
  52. public void Start()
  53. {
  54. this.Accept(World.Instance.Options.Host, World.Instance.Options.Port,
  55. World.Instance.Options.Protocol);
  56. }
  57. public void Update()
  58. {
  59. this.service.Update();
  60. }
  61. /// <summary>
  62. /// 接收连接
  63. /// </summary>
  64. private async void AcceptChannel()
  65. {
  66. while (true)
  67. {
  68. AChannel channel = await this.service.GetChannel();
  69. this.ProcessChannel(channel);
  70. }
  71. }
  72. /// <summary>
  73. /// 接收分发封包
  74. /// </summary>
  75. /// <param name="channel"></param>
  76. private async void ProcessChannel(AChannel channel)
  77. {
  78. while (true)
  79. {
  80. byte[] messageBytes = await channel.RecvAsync();
  81. Opcode opcode = (Opcode)BitConverter.ToUInt16(messageBytes, 0);
  82. // rpc异常
  83. if (opcode == Opcode.RpcException)
  84. {
  85. int id = BitConverter.ToInt32(messageBytes, 2);
  86. this.RpcCallback(channel, id, messageBytes, RpcResponseStatus.Exception);
  87. continue;
  88. }
  89. // 表示消息是rpc响应消息
  90. if (opcode == Opcode.RpcResponse)
  91. {
  92. int id = BitConverter.ToInt32(messageBytes, 2);
  93. this.RpcCallback(channel, id, messageBytes, RpcResponseStatus.Succee);
  94. continue;
  95. }
  96. // 如果是server message(发给client的消息),说明这是gate server,需要根据unitid查到channel,进行发送
  97. if (MessageTypeHelper.IsServerMessage(opcode))
  98. {
  99. byte[] idBuffer = new byte[12];
  100. Array.Copy(messageBytes, 2, idBuffer, 0, 12);
  101. ObjectId unitId = new ObjectId(idBuffer);
  102. byte[] buffer = new byte[messageBytes.Length - 6];
  103. Array.Copy(messageBytes, 6, buffer, 0, buffer.Length);
  104. World.Instance.GetComponent<GateNetworkComponent>().SendAsync(unitId, buffer);
  105. continue;
  106. }
  107. // 处理Rpc请求,并且返回结果
  108. RpcDo(channel, opcode, messageBytes);
  109. }
  110. }
  111. private async static void RpcDo(AChannel channel, Opcode opcode, byte[] messageBytes)
  112. {
  113. byte[] opcodeBuffer;
  114. int id = BitConverter.ToInt32(messageBytes, 2);
  115. byte[] idBuffer = BitConverter.GetBytes(id);
  116. try
  117. {
  118. opcodeBuffer = BitConverter.GetBytes((ushort)Opcode.RpcResponse);
  119. byte[] result = await World.Instance.GetComponent<MessageComponent>().RunAsync(opcode, messageBytes);
  120. channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, result });
  121. }
  122. catch (Exception e)
  123. {
  124. opcodeBuffer = BitConverter.GetBytes((ushort)Opcode.RpcException);
  125. BinaryFormatter formatter = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.All));
  126. using (MemoryStream stream = new MemoryStream())
  127. {
  128. formatter.Serialize(stream, e);
  129. channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, stream.ToArray() });
  130. }
  131. }
  132. }
  133. // 消息回调或者超时回调
  134. public void RpcCallback(AChannel channel, int id, byte[] buffer, RpcResponseStatus responseStatus)
  135. {
  136. Action<byte[], RpcResponseStatus> action;
  137. if (!this.requestCallback.TryGetValue(id, out action))
  138. {
  139. return;
  140. }
  141. this.requestCallback.Remove(id);
  142. action(buffer, responseStatus);
  143. }
  144. /// <summary>
  145. /// Rpc请求
  146. /// </summary>
  147. public Task<T> RpcCall<T, K>(string address, K request, int waitTime = 0)
  148. {
  149. AChannel channel = this.service.GetChannel(address);
  150. ++this.requestId;
  151. byte[] requestBuffer = MongoHelper.ToBson(request);
  152. Opcode opcode = (Opcode)Enum.Parse(typeof(Opcode), request.GetType().Name);
  153. byte[] opcodeBuffer = BitConverter.GetBytes((ushort)opcode);
  154. byte[] idBuffer = BitConverter.GetBytes(this.requestId);
  155. channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, requestBuffer });
  156. var tcs = new TaskCompletionSource<T>();
  157. this.requestCallback[this.requestId] = (messageBytes, status) =>
  158. {
  159. if (status == RpcResponseStatus.Timeout)
  160. {
  161. tcs.SetException(new Exception(
  162. string.Format("rpc timeout {0} {1}", opcode, MongoHelper.ToJson(request))));
  163. return;
  164. }
  165. if (status == RpcResponseStatus.Exception)
  166. {
  167. BinaryFormatter formatter = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.All));
  168. Exception exception;
  169. using (MemoryStream stream = new MemoryStream(messageBytes))
  170. {
  171. stream.Seek(6, SeekOrigin.Begin);
  172. exception = (Exception)formatter.Deserialize(stream);
  173. }
  174. tcs.SetException(exception);
  175. return;
  176. }
  177. // RpcResponseStatus.Succee
  178. T response = MongoHelper.FromBson<T>(messageBytes, 6);
  179. tcs.SetResult(response);
  180. };
  181. if (waitTime > 0)
  182. {
  183. this.service.Timer.Add(TimeHelper.Now() + waitTime,
  184. () => { this.RpcCallback(channel, this.requestId, null, RpcResponseStatus.Timeout); });
  185. }
  186. return tcs.Task;
  187. }
  188. }
  189. }