|
|
@@ -22,13 +22,12 @@ namespace Model
|
|
|
{
|
|
|
private static uint RpcId { get; set; }
|
|
|
private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
|
|
|
- private readonly Dictionary<ushort, Action<byte[], int, int>> waitCallback = new Dictionary<ushort, Action<byte[], int, int>>();
|
|
|
private AChannel channel;
|
|
|
- private MessageHandlerComponent messageHandler;
|
|
|
+ private MessageDispatherComponent messageDispather;
|
|
|
|
|
|
public void Awake(AChannel aChannel)
|
|
|
{
|
|
|
- this.messageHandler = Game.Scene.GetComponent<MessageHandlerComponent>();
|
|
|
+ this.messageDispather = Game.Scene.GetComponent<MessageDispatherComponent>();
|
|
|
this.channel = aChannel;
|
|
|
this.StartRecv();
|
|
|
}
|
|
|
@@ -81,8 +80,10 @@ namespace Model
|
|
|
private void Run(ushort opcode, byte[] messageBytes)
|
|
|
{
|
|
|
int offset = 0;
|
|
|
- uint flagUInt = BitConverter.ToUInt32(messageBytes, 2);
|
|
|
- bool isCompressed = (flagUInt & 0x80000000) > 0;
|
|
|
+ uint flag = BitConverter.ToUInt32(messageBytes, 2);
|
|
|
+ uint rpcFlag = flag & 0x40000000;
|
|
|
+ uint rpcId = flag & 0x3fffffff;
|
|
|
+ bool isCompressed = (flag & 0x80000000) > 0;
|
|
|
if (isCompressed) // 最高位为1,表示有压缩,需要解压缩
|
|
|
{
|
|
|
messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
|
|
|
@@ -92,34 +93,41 @@ namespace Model
|
|
|
{
|
|
|
offset = 6;
|
|
|
}
|
|
|
- uint rpcId = flagUInt & 0x7fffffff;
|
|
|
- this.RunDecompressedBytes(opcode, rpcId, messageBytes, offset);
|
|
|
+
|
|
|
+ this.RunDecompressedBytes(opcode, rpcId, rpcFlag, messageBytes, offset);
|
|
|
}
|
|
|
|
|
|
- private void RunDecompressedBytes(ushort opcode, uint rpcId, byte[] messageBytes, int offset)
|
|
|
+ private void RunDecompressedBytes(ushort opcode, uint rpcId, uint rpcFlag, byte[] messageBytes, int offset)
|
|
|
{
|
|
|
- Action<byte[], int, int> action;
|
|
|
- if (this.requestCallback.TryGetValue(rpcId, out action))
|
|
|
+ if (rpcId == 0)
|
|
|
{
|
|
|
- this.requestCallback.Remove(rpcId);
|
|
|
- action(messageBytes, offset, messageBytes.Length - offset);
|
|
|
- return;
|
|
|
+ this.messageDispather.Handle(this.Owner, opcode, messageBytes, offset);
|
|
|
}
|
|
|
|
|
|
- if (this.waitCallback.TryGetValue(opcode, out action))
|
|
|
+ // rpcFlag>0 表示这是一个rpc响应消息
|
|
|
+ if (rpcFlag > 0)
|
|
|
{
|
|
|
- this.waitCallback.Remove(opcode);
|
|
|
- action(messageBytes, offset, messageBytes.Length - offset);
|
|
|
- return;
|
|
|
+ Action<byte[], int, int> action;
|
|
|
+ if (this.requestCallback.TryGetValue(rpcId, out action))
|
|
|
+ {
|
|
|
+ this.requestCallback.Remove(rpcId);
|
|
|
+ action(messageBytes, offset, messageBytes.Length - offset);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else // 这是一个rpc请求消息
|
|
|
+ {
|
|
|
+ this.messageDispather.HandleRpc(this.Owner, opcode, messageBytes, offset, rpcId);
|
|
|
}
|
|
|
-
|
|
|
- this.messageHandler.Handle(this.Owner, opcode, messageBytes, offset);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- public Task<Response> CallAsync<Response>(object request, CancellationToken cancellationToken) where Response : IErrorMessage
|
|
|
+ /// <summary>
|
|
|
+ /// Rpc调用
|
|
|
+ /// </summary>
|
|
|
+ public Task<Response> Call<Request, Response>(Request request, CancellationToken cancellationToken)
|
|
|
+ where Request : ARequest
|
|
|
+ where Response : AResponse
|
|
|
{
|
|
|
- this.Send(++RpcId, request);
|
|
|
+ this.SendMessage(++RpcId, request);
|
|
|
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
|
|
|
@@ -128,9 +136,9 @@ namespace Model
|
|
|
try
|
|
|
{
|
|
|
Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
|
|
|
- if (response.ErrorMessage.Errno != 0)
|
|
|
+ if (response.ErrorCode != 0)
|
|
|
{
|
|
|
- tcs.SetException(new RpcException(response.ErrorMessage.Errno, response.ErrorMessage.Message));
|
|
|
+ tcs.SetException(new RpcException(response.ErrorCode, response.Message));
|
|
|
return;
|
|
|
}
|
|
|
tcs.SetResult(response);
|
|
|
@@ -149,21 +157,21 @@ namespace Model
|
|
|
/// <summary>
|
|
|
/// Rpc调用,发送一个消息,等待返回一个消息
|
|
|
/// </summary>
|
|
|
- /// <typeparam name="Response"></typeparam>
|
|
|
- /// <param name="request"></param>
|
|
|
- /// <returns></returns>
|
|
|
- public Task<Response> CallAsync<Response>(object request) where Response : IErrorMessage
|
|
|
+ public Task<Response> Call<Request, Response>(Request request)
|
|
|
+ where Request: ARequest
|
|
|
+ where Response : AResponse
|
|
|
{
|
|
|
- this.Send(++RpcId, request);
|
|
|
+ request.RpcId = ++RpcId;
|
|
|
+ this.SendMessage(++RpcId, request);
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
this.requestCallback[RpcId] = (bytes, offset, count) =>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
|
|
|
- if (response.ErrorMessage.Errno != 0)
|
|
|
+ if (response.ErrorCode != 0)
|
|
|
{
|
|
|
- tcs.SetException(new RpcException(response.ErrorMessage.Errno, response.ErrorMessage.Message));
|
|
|
+ tcs.SetException(new RpcException(response.ErrorCode, response.Message));
|
|
|
return;
|
|
|
}
|
|
|
tcs.SetResult(response);
|
|
|
@@ -177,68 +185,24 @@ namespace Model
|
|
|
return tcs.Task;
|
|
|
}
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// 不发送消息,直接等待返回一个消息
|
|
|
- /// </summary>
|
|
|
- /// <typeparam name="Response"></typeparam>
|
|
|
- /// <param name="cancellationToken"></param>
|
|
|
- /// <returns></returns>
|
|
|
- public Task<Response> WaitAsync<Response>(CancellationToken cancellationToken) where Response : class
|
|
|
- {
|
|
|
- var tcs = new TaskCompletionSource<Response>();
|
|
|
- ushort opcode = this.messageHandler.messageOpcode[typeof(Response)];
|
|
|
- this.waitCallback[opcode] = (bytes, offset, count) =>
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
|
|
|
- tcs.SetResult(response);
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- cancellationToken.Register(() => { this.waitCallback.Remove(opcode); });
|
|
|
-
|
|
|
- return tcs.Task;
|
|
|
- }
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// 不发送消息,直接等待返回一个消息
|
|
|
- /// </summary>
|
|
|
- /// <typeparam name="Response"></typeparam>
|
|
|
- /// <returns></returns>
|
|
|
- public Task<Response> WaitAsync<Response>() where Response : class
|
|
|
+ public void Send(object message)
|
|
|
{
|
|
|
- var tcs = new TaskCompletionSource<Response>();
|
|
|
- ushort opcode = this.messageHandler.messageOpcode[typeof(Response)];
|
|
|
- this.waitCallback[opcode] = (bytes, offset, count) =>
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
|
|
|
- tcs.SetResult(response);
|
|
|
- }
|
|
|
- catch (Exception e)
|
|
|
- {
|
|
|
- tcs.SetException(new Exception($"Wait Error: {typeof(Response).FullName}", e));
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- return tcs.Task;
|
|
|
+ this.SendMessage(0, message);
|
|
|
}
|
|
|
|
|
|
- public void Send(object message)
|
|
|
+ public void Reply<T>(uint rpcId, T message) where T: AResponse
|
|
|
{
|
|
|
- this.Send(0, message);
|
|
|
+ this.SendMessage(rpcId, message, false);
|
|
|
}
|
|
|
|
|
|
- public void Send(uint rpcId, object message)
|
|
|
+ private void SendMessage(uint rpcId, object message, bool isCall = true)
|
|
|
{
|
|
|
- ushort opcode = this.messageHandler.GetOpcode(message.GetType());
|
|
|
+ ushort opcode = this.messageDispather.GetOpcode(message.GetType());
|
|
|
byte[] opcodeBytes = BitConverter.GetBytes(opcode);
|
|
|
+ if (rpcId > 0 && !isCall)
|
|
|
+ {
|
|
|
+ rpcId = rpcId | 0x4fffffff;
|
|
|
+ }
|
|
|
byte[] seqBytes = BitConverter.GetBytes(rpcId);
|
|
|
byte[] messageBytes = MongoHelper.ToBson(message);
|
|
|
|