AChannel.cs 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading.Tasks;
  4. using Common.Base;
  5. using Common.Helper;
  6. namespace Common.Network
  7. {
  8. [Flags]
  9. public enum PacketFlags
  10. {
  11. None = 0,
  12. Reliable = 1 << 0,
  13. Unsequenced = 1 << 1,
  14. NoAllocate = 1 << 2
  15. }
  16. public abstract class AChannel: Entity<AChannel>, IDisposable
  17. {
  18. protected IService service;
  19. private int requestId;
  20. protected Action<AChannel> onDispose = channel => { };
  21. private readonly Dictionary<int, Action<byte[], bool>> requestCallback = new Dictionary<int, Action<byte[], bool>>();
  22. protected AChannel(IService service)
  23. {
  24. this.service = service;
  25. }
  26. /// <summary>
  27. /// 发送消息
  28. /// </summary>
  29. public abstract void SendAsync(
  30. byte[] buffer, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable);
  31. public abstract void SendAsync(
  32. List<byte[]> buffers, byte channelID = 0, PacketFlags flags = PacketFlags.Reliable);
  33. /// <summary>
  34. /// 接收消息
  35. /// </summary>
  36. public abstract Task<byte[]> RecvAsync();
  37. public abstract Task<bool> DisconnnectAsync();
  38. public abstract string RemoteAddress { get; }
  39. public event Action<AChannel> OnDispose
  40. {
  41. add
  42. {
  43. this.onDispose += value;
  44. }
  45. remove
  46. {
  47. this.onDispose -= value;
  48. }
  49. }
  50. public abstract void Dispose();
  51. // 消息回调或者超时回调
  52. public void RequestCallback(int id, byte[] buffer, bool isOK)
  53. {
  54. Action<byte[], bool> action;
  55. if (this.requestCallback.TryGetValue(id, out action))
  56. {
  57. action(buffer, isOK);
  58. }
  59. this.requestCallback.Remove(id);
  60. }
  61. /// <summary>
  62. /// Rpc请求
  63. /// </summary>
  64. /// <typeparam name="T"></typeparam>
  65. /// <typeparam name="K"></typeparam>
  66. /// <param name="type"></param>
  67. /// <param name="request"></param>
  68. /// <param name="waitTime"></param>
  69. /// <returns></returns>
  70. public Task<T> Request<T, K>(short type, K request, int waitTime = 0)
  71. {
  72. ++this.requestId;
  73. byte[] requestBuffer = MongoHelper.ToBson(request);
  74. byte[] typeBuffer = BitConverter.GetBytes(type);
  75. byte[] idBuffer = BitConverter.GetBytes(this.requestId);
  76. this.SendAsync(new List<byte[]> { typeBuffer, idBuffer, requestBuffer });
  77. var tcs = new TaskCompletionSource<T>();
  78. this.requestCallback[this.requestId] = (e, b) =>
  79. {
  80. if (b)
  81. {
  82. T response = MongoHelper.FromBson<T>(e, 6);
  83. tcs.SetResult(response);
  84. }
  85. else
  86. {
  87. tcs.SetException(new Exception(string.Format("rpc timeout {0} {1}", type, MongoHelper.ToJson(request))));
  88. }
  89. };
  90. if (waitTime > 0)
  91. {
  92. this.service.Timer.Add(TimeHelper.Now() + waitTime, () => { this.RequestCallback(this.requestId, null, false); });
  93. }
  94. return tcs.Task;
  95. }
  96. /// <summary>
  97. /// Rpc响应
  98. /// </summary>
  99. /// <typeparam name="T"></typeparam>
  100. /// <param name="type"></param>
  101. /// <param name="id"></param>
  102. /// <param name="response"></param>
  103. public void Response<T>(short type, int id, T response)
  104. {
  105. byte[] responseBuffer = MongoHelper.ToBson(response);
  106. byte[] typeBuffer = BitConverter.GetBytes(type);
  107. byte[] idBuffer = BitConverter.GetBytes(id);
  108. this.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
  109. }
  110. }
  111. }