|
|
@@ -2,8 +2,6 @@
|
|
|
using System.Collections.Generic;
|
|
|
using System.Threading;
|
|
|
using System.Threading.Tasks;
|
|
|
-using Base;
|
|
|
-using MongoDB.Bson;
|
|
|
|
|
|
namespace Model
|
|
|
{
|
|
|
@@ -11,14 +9,17 @@ namespace Model
|
|
|
{
|
|
|
private static uint RpcId { get; set; }
|
|
|
private readonly NetworkComponent network;
|
|
|
- private readonly Dictionary<uint, Action<byte[], int, int>> requestCallback = new Dictionary<uint, Action<byte[], int, int>>();
|
|
|
+ private readonly Dictionary<uint, Action<object>> requestCallback = new Dictionary<uint, Action<object>>();
|
|
|
private readonly AChannel channel;
|
|
|
private bool isRpc;
|
|
|
|
|
|
- public Session(NetworkComponent network, AChannel channel)
|
|
|
+ private readonly IMessagePacker messagePacker;
|
|
|
+
|
|
|
+ public Session(NetworkComponent network, AChannel channel, IMessagePacker messagePacker)
|
|
|
{
|
|
|
this.network = network;
|
|
|
this.channel = channel;
|
|
|
+ this.messagePacker = messagePacker;
|
|
|
this.StartRecv();
|
|
|
}
|
|
|
|
|
|
@@ -64,13 +65,12 @@ namespace Model
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- if (messageBytes.Length < 6)
|
|
|
+ if (messageBytes.Length < 3)
|
|
|
{
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
ushort opcode = BitConverter.ToUInt16(messageBytes, 0);
|
|
|
-
|
|
|
try
|
|
|
{
|
|
|
this.Run(opcode, messageBytes);
|
|
|
@@ -85,71 +85,71 @@ namespace Model
|
|
|
private void Run(ushort opcode, byte[] messageBytes)
|
|
|
{
|
|
|
int offset = 0;
|
|
|
- uint flag = BitConverter.ToUInt32(messageBytes, 2);
|
|
|
+ byte flag = messageBytes[2];
|
|
|
|
|
|
- bool isCompressed = (flag & 0x80000000) > 0;
|
|
|
+ bool isCompressed = (flag & 0x80) > 0;
|
|
|
+ const int opcodeAndFlagLength = 3;
|
|
|
if (isCompressed) // 最高位为1,表示有压缩,需要解压缩
|
|
|
{
|
|
|
- messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6);
|
|
|
+ messageBytes = ZipHelper.Decompress(messageBytes, opcodeAndFlagLength, messageBytes.Length - opcodeAndFlagLength);
|
|
|
offset = 0;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- offset = 6;
|
|
|
+ offset = opcodeAndFlagLength;
|
|
|
}
|
|
|
|
|
|
- this.RunDecompressedBytes(opcode, flag, messageBytes, offset);
|
|
|
+ this.RunDecompressedBytes(opcode, messageBytes, offset);
|
|
|
}
|
|
|
|
|
|
- private void RunDecompressedBytes(ushort opcode, uint flag, byte[] messageBytes, int offset)
|
|
|
+ private void RunDecompressedBytes(ushort opcode, byte[] messageBytes, int offset)
|
|
|
{
|
|
|
- uint rpcFlag = flag & 0x40000000;
|
|
|
- uint rpcId = flag & 0x3fffffff;
|
|
|
-
|
|
|
- // 普通消息或者是Rpc请求消息
|
|
|
- if (rpcFlag == 0)
|
|
|
+ Type messageType = this.network.Owner.GetComponent<OpcodeTypeComponent>().GetType(opcode);
|
|
|
+ object message = messagePacker.DeserializeFrom(messageType, messageBytes, offset, messageBytes.Length - offset);
|
|
|
+
|
|
|
+ if (message is AActorMessage)
|
|
|
{
|
|
|
- MessageInfo messageInfo = new MessageInfo(opcode, messageBytes, offset, rpcId);
|
|
|
- Type messageType = this.network.Owner.GetComponent<OpcodeTypeComponent>().GetType(messageInfo.Opcode);
|
|
|
- object message = MongoHelper.FromBson(messageType, messageInfo.MessageBytes, messageInfo.Offset, messageInfo.Count);
|
|
|
- messageInfo.Message = message;
|
|
|
- if (message is AActorMessage)
|
|
|
- {
|
|
|
- this.network.Owner.GetComponent<ActorMessageDispatherComponent>().Handle(this, messageInfo);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- this.network.Owner.GetComponent<MessageDispatherComponent>().Handle(this, messageInfo);
|
|
|
- }
|
|
|
+ this.network.Owner.GetComponent<ActorMessageDispatherComponent>().Handle(this, message);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (message is AMessage || message is ARequest)
|
|
|
+ {
|
|
|
+ this.network.Owner.GetComponent<MessageDispatherComponent>().Handle(this, message);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // rpcFlag>0 表示这是一个rpc响应消息
|
|
|
- // Rpc回调有找不着的可能,因为client可能取消Rpc调用
|
|
|
- if (!this.requestCallback.TryGetValue(rpcId, out Action<byte[], int, int> action))
|
|
|
+ if (message is AResponse response)
|
|
|
{
|
|
|
+ // rpcFlag>0 表示这是一个rpc响应消息
|
|
|
+ // Rpc回调有找不着的可能,因为client可能取消Rpc调用
|
|
|
+ if (!this.requestCallback.TryGetValue(response.RpcId, out Action<object> action))
|
|
|
+ {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ this.requestCallback.Remove(response.RpcId);
|
|
|
+ action(message);
|
|
|
return;
|
|
|
}
|
|
|
- this.requestCallback.Remove(rpcId);
|
|
|
- action(messageBytes, offset, messageBytes.Length - offset);
|
|
|
+
|
|
|
+ throw new Exception($"message type error: {message.GetType().FullName}");
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Rpc调用
|
|
|
/// </summary>
|
|
|
public Task<Response> Call<Request, Response>(Request request, CancellationToken cancellationToken) where Request : ARequest
|
|
|
- where Response : AResponse
|
|
|
+ where Response : AResponse
|
|
|
{
|
|
|
-
|
|
|
- this.SendMessage(++RpcId, request);
|
|
|
+ request.RpcId = ++RpcId;
|
|
|
+ this.SendMessage(request);
|
|
|
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
|
|
|
- this.requestCallback[RpcId] = (bytes, offset, count) =>
|
|
|
+ this.requestCallback[RpcId] = (message) =>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
|
|
|
+ Response response = (Response)message;
|
|
|
if (response.Error != 0)
|
|
|
{
|
|
|
tcs.SetException(new RpcException(response.Error, response.Message));
|
|
|
@@ -175,20 +175,21 @@ namespace Model
|
|
|
/// </summary>
|
|
|
public Task<Response> Call<Request, Response>(Request request) where Request : ARequest where Response : AResponse
|
|
|
{
|
|
|
- this.SendMessage(++RpcId, request);
|
|
|
+ request.RpcId = ++RpcId;
|
|
|
+ this.SendMessage(request);
|
|
|
|
|
|
var tcs = new TaskCompletionSource<Response>();
|
|
|
- this.requestCallback[RpcId] = (bytes, offset, count) =>
|
|
|
+ this.requestCallback[RpcId] = (message) =>
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- Response response = MongoHelper.FromBson<Response>(bytes, offset, count);
|
|
|
+ Response response = (Response)message;
|
|
|
if (response.Error != 0)
|
|
|
{
|
|
|
tcs.SetException(new RpcException(response.Error, response.Message));
|
|
|
return;
|
|
|
}
|
|
|
- //Log.Info($"recv: {response.ToJson()}");
|
|
|
+ //Log.Debug($"recv: {response.ToJson()}");
|
|
|
this.isRpc = true;
|
|
|
tcs.SetResult(response);
|
|
|
}
|
|
|
@@ -207,42 +208,39 @@ namespace Model
|
|
|
{
|
|
|
throw new Exception("session已经被Dispose了");
|
|
|
}
|
|
|
- this.SendMessage(0, message);
|
|
|
+ this.SendMessage(message);
|
|
|
}
|
|
|
|
|
|
- public void Reply<Response>(uint rpcId, Response message)
|
|
|
+ public void Reply<Response>(Response message) where Response : AResponse
|
|
|
{
|
|
|
if (this.Id == 0)
|
|
|
{
|
|
|
throw new Exception("session已经被Dispose了");
|
|
|
}
|
|
|
- this.SendMessage(rpcId, message, false);
|
|
|
+ this.SendMessage(message);
|
|
|
}
|
|
|
|
|
|
- private void SendMessage(uint rpcId, object message, bool isCall = true)
|
|
|
+ private void SendMessage(object message)
|
|
|
{
|
|
|
//Log.Debug($"send: {message.ToJson()}");
|
|
|
ushort opcode = this.network.Owner.GetComponent<OpcodeTypeComponent>().GetOpcode(message.GetType());
|
|
|
byte[] opcodeBytes = BitConverter.GetBytes(opcode);
|
|
|
- if (!isCall)
|
|
|
- {
|
|
|
- rpcId = rpcId | 0x40000000;
|
|
|
- }
|
|
|
|
|
|
- byte[] messageBytes = message.ToBson();
|
|
|
+ byte[] messageBytes = messagePacker.SerializeToByteArray(message);
|
|
|
+ byte flag = 0;
|
|
|
if (messageBytes.Length > 100)
|
|
|
{
|
|
|
byte[] newMessageBytes = ZipHelper.Compress(messageBytes);
|
|
|
if (newMessageBytes.Length < messageBytes.Length)
|
|
|
{
|
|
|
messageBytes = newMessageBytes;
|
|
|
- rpcId = rpcId | 0x80000000;
|
|
|
+ flag |= 0x80;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- byte[] seqBytes = BitConverter.GetBytes(rpcId);
|
|
|
+ byte[] flagBytes = { flag };
|
|
|
|
|
|
- channel.Send(new List<byte[]> { opcodeBytes, seqBytes, messageBytes });
|
|
|
+ channel.Send(new List<byte[]> { opcodeBytes, flagBytes, messageBytes });
|
|
|
}
|
|
|
|
|
|
public override void Dispose()
|