Session.cs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace Model
  7. {
  8. public sealed class Session : Entity
  9. {
  10. private static uint RpcId { get; set; }
  11. private readonly NetworkComponent network;
  12. private readonly Dictionary<uint, Action<object>> requestCallback = new Dictionary<uint, Action<object>>();
  13. private readonly AChannel channel;
  14. private readonly List<byte[]> byteses = new List<byte[]>() {new byte[0], new byte[0]};
  15. public Session(NetworkComponent network, AChannel channel)
  16. {
  17. this.network = network;
  18. this.channel = channel;
  19. this.StartRecv();
  20. }
  21. public IPEndPoint RemoteAddress
  22. {
  23. get
  24. {
  25. return this.channel.RemoteAddress;
  26. }
  27. }
  28. public ChannelType ChannelType
  29. {
  30. get
  31. {
  32. return this.channel.ChannelType;
  33. }
  34. }
  35. private async void StartRecv()
  36. {
  37. while (true)
  38. {
  39. if (this.Id == 0)
  40. {
  41. return;
  42. }
  43. byte[] messageBytes;
  44. try
  45. {
  46. messageBytes = await channel.Recv();
  47. if (this.Id == 0)
  48. {
  49. return;
  50. }
  51. }
  52. catch (Exception e)
  53. {
  54. Log.Error(e.ToString());
  55. continue;
  56. }
  57. if (messageBytes.Length < 3)
  58. {
  59. Log.Error($"message error length < 3, ip: {this.RemoteAddress}");
  60. this.network.Remove(this.Id);
  61. return;
  62. }
  63. ushort opcode = BitConverter.ToUInt16(messageBytes, 0);
  64. try
  65. {
  66. this.Run(opcode, messageBytes);
  67. }
  68. catch (Exception e)
  69. {
  70. Log.Error(e.ToString());
  71. }
  72. }
  73. }
  74. private void Run(ushort opcode, byte[] messageBytes)
  75. {
  76. int offset;
  77. // opcode最高位表示是否压缩
  78. bool isCompressed = (opcode & 0x8000) > 0;
  79. if (isCompressed) // 最高位为1,表示有压缩,需要解压缩
  80. {
  81. messageBytes = ZipHelper.Decompress(messageBytes, 2, messageBytes.Length - 2);
  82. offset = 0;
  83. }
  84. else
  85. {
  86. offset = 2;
  87. }
  88. opcode &= 0x7fff;
  89. this.RunDecompressedBytes(opcode, messageBytes, offset);
  90. }
  91. private void RunDecompressedBytes(ushort opcode, byte[] messageBytes, int offset)
  92. {
  93. object message;
  94. Opcode op;
  95. try
  96. {
  97. op = (Opcode)opcode;
  98. Type messageType = this.network.Entity.GetComponent<OpcodeTypeComponent>().GetType(op);
  99. message = this.network.MessagePacker.DeserializeFrom(messageType, messageBytes, offset, messageBytes.Length - offset);
  100. }
  101. catch (Exception e)
  102. {
  103. Log.Error($"message deserialize error, ip: {this.RemoteAddress} {opcode} {e}");
  104. this.network.Remove(this.Id);
  105. return;
  106. }
  107. //Log.Debug($"recv: {MongoHelper.ToJson(message)}");
  108. AResponse response = message as AResponse;
  109. if (response != null)
  110. {
  111. // rpcFlag>0 表示这是一个rpc响应消息
  112. // Rpc回调有找不着的可能,因为client可能取消Rpc调用
  113. Action<object> action;
  114. if (!this.requestCallback.TryGetValue(response.RpcId, out action))
  115. {
  116. return;
  117. }
  118. this.requestCallback.Remove(response.RpcId);
  119. action(message);
  120. return;
  121. }
  122. this.network.MessageDispatcher.Dispatch(this, op, offset, messageBytes, (AMessage)message);
  123. }
  124. /// <summary>
  125. /// Rpc调用
  126. /// </summary>
  127. public void CallWithAction(ARequest request, Action<AResponse> action)
  128. {
  129. request.RpcId = ++RpcId;
  130. this.SendMessage(request);
  131. this.requestCallback[RpcId] = (message) =>
  132. {
  133. try
  134. {
  135. AResponse response = (AResponse)message;
  136. action(response);
  137. }
  138. catch (Exception e)
  139. {
  140. Log.Error(e.ToString());
  141. }
  142. };
  143. }
  144. /// <summary>
  145. /// Rpc调用,发送一个消息,等待返回一个消息
  146. /// </summary>
  147. public Task<AResponse> Call(ARequest request, bool isHotfix)
  148. {
  149. request.RpcId = ++RpcId;
  150. this.SendMessage(request);
  151. var tcs = new TaskCompletionSource<AResponse>();
  152. this.requestCallback[RpcId] = (message) =>
  153. {
  154. try
  155. {
  156. AResponse response = (AResponse)message;
  157. if (response.Error > 100)
  158. {
  159. tcs.SetException(new RpcException(response.Error, response.Message));
  160. return;
  161. }
  162. //Log.Debug($"recv: {MongoHelper.ToJson(response)}");
  163. tcs.SetResult(response);
  164. }
  165. catch (Exception e)
  166. {
  167. tcs.SetException(new Exception($"Rpc Error: {message.GetType().FullName}", e));
  168. }
  169. };
  170. return tcs.Task;
  171. }
  172. /// <summary>
  173. /// Rpc调用
  174. /// </summary>
  175. public Task<AResponse> Call(ARequest request, bool isHotfix, CancellationToken cancellationToken)
  176. {
  177. request.RpcId = ++RpcId;
  178. this.SendMessage(request);
  179. var tcs = new TaskCompletionSource<AResponse>();
  180. this.requestCallback[RpcId] = (message) =>
  181. {
  182. try
  183. {
  184. AResponse response = (AResponse)message;
  185. if (response.Error > 100)
  186. {
  187. tcs.SetException(new RpcException(response.Error, response.Message));
  188. return;
  189. }
  190. //Log.Debug($"recv: {MongoHelper.ToJson(response)}");
  191. tcs.SetResult(response);
  192. }
  193. catch (Exception e)
  194. {
  195. tcs.SetException(new Exception($"Rpc Error: {message.GetType().FullName}", e));
  196. }
  197. };
  198. cancellationToken.Register(() => { this.requestCallback.Remove(RpcId); });
  199. return tcs.Task;
  200. }
  201. /// <summary>
  202. /// Rpc调用,发送一个消息,等待返回一个消息
  203. /// </summary>
  204. public Task<Response> Call<Response>(ARequest request) where Response : AResponse
  205. {
  206. request.RpcId = ++RpcId;
  207. this.SendMessage(request);
  208. var tcs = new TaskCompletionSource<Response>();
  209. this.requestCallback[RpcId] = (message) =>
  210. {
  211. try
  212. {
  213. Response response = (Response)message;
  214. if (response.Error > 100)
  215. {
  216. tcs.SetException(new RpcException(response.Error, response.Message));
  217. return;
  218. }
  219. //Log.Debug($"recv: {MongoHelper.ToJson(response)}");
  220. tcs.SetResult(response);
  221. }
  222. catch (Exception e)
  223. {
  224. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  225. }
  226. };
  227. return tcs.Task;
  228. }
  229. /// <summary>
  230. /// Rpc调用
  231. /// </summary>
  232. public Task<Response> Call<Response>(ARequest request, CancellationToken cancellationToken)
  233. where Response : AResponse
  234. {
  235. request.RpcId = ++RpcId;
  236. this.SendMessage(request);
  237. var tcs = new TaskCompletionSource<Response>();
  238. this.requestCallback[RpcId] = (message) =>
  239. {
  240. try
  241. {
  242. Response response = (Response)message;
  243. if (response.Error > 100)
  244. {
  245. tcs.SetException(new RpcException(response.Error, response.Message));
  246. return;
  247. }
  248. //Log.Debug($"recv: {MongoHelper.ToJson(response)}");
  249. tcs.SetResult(response);
  250. }
  251. catch (Exception e)
  252. {
  253. tcs.SetException(new Exception($"Rpc Error: {typeof(Response).FullName}", e));
  254. }
  255. };
  256. cancellationToken.Register(() => { this.requestCallback.Remove(RpcId); });
  257. return tcs.Task;
  258. }
  259. public void Send(AMessage message)
  260. {
  261. if (this.Id == 0)
  262. {
  263. throw new Exception("session已经被Dispose了");
  264. }
  265. this.SendMessage(message);
  266. }
  267. public void Reply<Response>(Response message) where Response : AResponse
  268. {
  269. if (this.Id == 0)
  270. {
  271. throw new Exception("session已经被Dispose了");
  272. }
  273. this.SendMessage(message);
  274. }
  275. private void SendMessage(object message)
  276. {
  277. //Log.Debug($"send: {MongoHelper.ToJson(message)}");
  278. Opcode opcode = this.network.Entity.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
  279. ushort op = (ushort)opcode;
  280. byte[] messageBytes = this.network.MessagePacker.SerializeToByteArray(message);
  281. if (messageBytes.Length > 100)
  282. {
  283. byte[] newMessageBytes = ZipHelper.Compress(messageBytes);
  284. if (newMessageBytes.Length < messageBytes.Length)
  285. {
  286. messageBytes = newMessageBytes;
  287. op |= 0x8000;
  288. }
  289. }
  290. byte[] opcodeBytes = BitConverter.GetBytes(op);
  291. this.byteses[0] = opcodeBytes;
  292. this.byteses[1] = messageBytes;
  293. channel.Send(this.byteses);
  294. }
  295. public override void Dispose()
  296. {
  297. if (this.Id == 0)
  298. {
  299. return;
  300. }
  301. long id = this.Id;
  302. base.Dispose();
  303. this.channel.Dispose();
  304. this.network.Remove(id);
  305. }
  306. }
  307. }