|
|
@@ -20,7 +20,7 @@ namespace Model
|
|
|
/// </summary>
|
|
|
public class MessageComponent: Component
|
|
|
{
|
|
|
- private uint RpcId { get; set; }
|
|
|
+ private static uint RpcId { get; set; }
|
|
|
private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
|
|
|
private readonly Dictionary<ushort, Action<byte[], int, int>> waitCallback = new Dictionary<ushort, Action<byte[], int, int>>();
|
|
|
private AChannel channel;
|
|
|
@@ -82,8 +82,8 @@ namespace Model
|
|
|
{
|
|
|
int offset = 0;
|
|
|
uint flagUInt = BitConverter.ToUInt32(messageBytes, 2);
|
|
|
- bool isCompressed = (byte)(flagUInt >> 24) == 1;
|
|
|
- if (isCompressed) // 表示有压缩,需要解压缩
|
|
|
+ bool isCompressed = (flagUInt & 0x80000000) > 0;
|
|
|
+ if (isCompressed) // 最高位为1,表示有压缩,需要解压缩
|
|
|
{
|
|
|
messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
|
|
|
offset = 0;
|
|
|
@@ -92,7 +92,7 @@ namespace Model
|
|
|
{
|
|
|
offset = 6;
|
|
|
}
|
|
|
- uint rpcId = flagUInt & 0x0fff;
|
|
|
+ uint rpcId = flagUInt & 0x7fffffff;
|
|
|
this.RunDecompressedBytes(opcode, rpcId, messageBytes, offset);
|
|
|
}
|
|
|
|
|
|
@@ -119,11 +119,11 @@ namespace Model
|
|
|
|
|
|
public Task<Response> CallAsync<Response>(object request, CancellationToken cancellationToken) where Response : IErrorMessage
|
|
|
{
|
|
|
- this.Send(request, ++this.RpcId);
|
|
|
+ this.Send(++RpcId, request);
|
|
|
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
|
|
|
- this.requestCallback[this.RpcId] = (bytes, offset, count) =>
|
|
|
+ this.requestCallback[RpcId] = (bytes, offset, count) =>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
@@ -141,7 +141,7 @@ namespace Model
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- cancellationToken.Register(() => { this.requestCallback.Remove(this.RpcId); });
|
|
|
+ cancellationToken.Register(() => { this.requestCallback.Remove(RpcId); });
|
|
|
|
|
|
return tcs.Task;
|
|
|
}
|
|
|
@@ -154,10 +154,10 @@ namespace Model
|
|
|
/// <returns></returns>
|
|
|
public Task<Response> CallAsync<Response>(object request) where Response : IErrorMessage
|
|
|
{
|
|
|
- this.Send(request, ++this.RpcId);
|
|
|
+ this.Send(++RpcId, request);
|
|
|
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
- this.requestCallback[this.RpcId] = (bytes, offset, count) =>
|
|
|
+ this.requestCallback[RpcId] = (bytes, offset, count) =>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
@@ -187,7 +187,7 @@ namespace Model
|
|
|
public Task<Response> WaitAsync<Response>(CancellationToken cancellationToken) where Response : class
|
|
|
{
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
- ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
|
|
|
+ ushort opcode = this.messageHandler.messageOpcode[typeof(Response)];
|
|
|
this.waitCallback[opcode] = (bytes, offset, count) =>
|
|
|
{
|
|
|
try
|
|
|
@@ -214,7 +214,7 @@ namespace Model
|
|
|
public Task<Response> WaitAsync<Response>() where Response : class
|
|
|
{
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
- ushort opcode = this.messageHandler.MessageOpcode[typeof(Response)];
|
|
|
+ ushort opcode = this.messageHandler.messageOpcode[typeof(Response)];
|
|
|
this.waitCallback[opcode] = (bytes, offset, count) =>
|
|
|
{
|
|
|
try
|
|
|
@@ -233,12 +233,12 @@ namespace Model
|
|
|
|
|
|
public void Send(object message)
|
|
|
{
|
|
|
- this.Send(message, 0);
|
|
|
+ this.Send(0, message);
|
|
|
}
|
|
|
|
|
|
- private void Send(object message, uint rpcId)
|
|
|
+ public void Send(uint rpcId, object message)
|
|
|
{
|
|
|
- ushort opcode = this.messageHandler.MessageOpcode[message.GetType()];
|
|
|
+ ushort opcode = this.messageHandler.GetOpcode(message.GetType());
|
|
|
byte[] opcodeBytes = BitConverter.GetBytes(opcode);
|
|
|
byte[] seqBytes = BitConverter.GetBytes(rpcId);
|
|
|
byte[] messageBytes = MongoHelper.ToBson(message);
|