|
|
@@ -7,6 +7,7 @@ using System.Threading.Tasks;
|
|
|
using Common.Base;
|
|
|
using Common.Helper;
|
|
|
using Common.Network;
|
|
|
+using MongoDB.Bson;
|
|
|
using TNet;
|
|
|
using UNet;
|
|
|
|
|
|
@@ -88,73 +89,67 @@ namespace Model
|
|
|
{
|
|
|
while (true)
|
|
|
{
|
|
|
- byte[] message = await channel.RecvAsync();
|
|
|
- Env env = new Env();
|
|
|
- env[EnvKey.Channel] = channel;
|
|
|
- env[EnvKey.Message] = message;
|
|
|
- ushort opcode = BitConverter.ToUInt16(message, 0);
|
|
|
- env[EnvKey.Opcode] = opcode;
|
|
|
-
|
|
|
- // 表示消息是rpc响应消息
|
|
|
- if (opcode == Opcode.RpcResponse)
|
|
|
- {
|
|
|
- int id = BitConverter.ToInt32(message, 2);
|
|
|
- this.RequestCallback(channel, id, message, RpcResponseStatus.Succee);
|
|
|
- continue;
|
|
|
- }
|
|
|
+ byte[] messageBytes = await channel.RecvAsync();
|
|
|
+ Opcode opcode = (Opcode)BitConverter.ToUInt16(messageBytes, 0);
|
|
|
|
|
|
// rpc异常
|
|
|
if (opcode == Opcode.RpcException)
|
|
|
{
|
|
|
- int id = BitConverter.ToInt32(message, 2);
|
|
|
- this.RequestCallback(channel, id, message, RpcResponseStatus.Exception);
|
|
|
+ int id = BitConverter.ToInt32(messageBytes, 2);
|
|
|
+ this.RpcCallback(channel, id, messageBytes, RpcResponseStatus.Exception);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- // 如果是server message(发给client的消息),说明这是gate server,需要根据unitid查到channel,进行发送
|
|
|
- if (MessageTypeHelper.IsServerMessage(opcode))
|
|
|
+ // 表示消息是rpc响应消息
|
|
|
+ if (opcode == Opcode.RpcResponse)
|
|
|
{
|
|
|
-#pragma warning disable 4014
|
|
|
- World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
- .RunAsync(EventType.GateRecvServerMessage, env);
|
|
|
-#pragma warning restore 4014
|
|
|
+ int id = BitConverter.ToInt32(messageBytes, 2);
|
|
|
+ this.RpcCallback(channel, id, messageBytes, RpcResponseStatus.Succee);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- // 进行消息分发
|
|
|
- if (MessageTypeHelper.IsClientMessage(opcode))
|
|
|
+ // 如果是server message(发给client的消息),说明这是gate server,需要根据unitid查到channel,进行发送
|
|
|
+ if (MessageTypeHelper.IsServerMessage(opcode))
|
|
|
{
|
|
|
-#pragma warning disable 4014
|
|
|
- World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
- .RunAsync(EventType.LogicRecvClientMessage, env);
|
|
|
-#pragma warning restore 4014
|
|
|
+ byte[] idBuffer = new byte[12];
|
|
|
+ Array.Copy(messageBytes, 2, idBuffer, 0, 12);
|
|
|
+ ObjectId unitId = new ObjectId(idBuffer);
|
|
|
+ byte[] buffer = new byte[messageBytes.Length - 6];
|
|
|
+ Array.Copy(messageBytes, 6, buffer, 0, buffer.Length);
|
|
|
+ World.Instance.GetComponent<GateNetworkComponent>().SendAsync(unitId, buffer);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- if (MessageTypeHelper.IsRpcRequestMessage(opcode))
|
|
|
- {
|
|
|
-#pragma warning disable 4014
|
|
|
- World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
- .RunAsync(EventType.LogicRecvRequestMessage, env);
|
|
|
-#pragma warning restore 4014
|
|
|
- }
|
|
|
+ // 处理Rpc请求,并且返回结果
|
|
|
+ RpcDo(channel, opcode, messageBytes);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void SendAsync(string address, byte[] buffer)
|
|
|
- {
|
|
|
- AChannel channel = this.service.GetChannel(address);
|
|
|
- channel.SendAsync(buffer);
|
|
|
- }
|
|
|
-
|
|
|
- public void SendAsync(string address, List<byte[]> buffers)
|
|
|
+ private async static void RpcDo(AChannel channel, Opcode opcode, byte[] messageBytes)
|
|
|
{
|
|
|
- AChannel channel = this.service.GetChannel(address);
|
|
|
- channel.SendAsync(buffers);
|
|
|
+ byte[] opcodeBuffer;
|
|
|
+ int id = BitConverter.ToInt32(messageBytes, 2);
|
|
|
+ byte[] idBuffer = BitConverter.GetBytes(id);
|
|
|
+ try
|
|
|
+ {
|
|
|
+ opcodeBuffer = BitConverter.GetBytes((ushort)Opcode.RpcResponse);
|
|
|
+ byte[] result = await World.Instance.GetComponent<MessageComponent>().RunAsync(opcode, messageBytes);
|
|
|
+ channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, result });
|
|
|
+ }
|
|
|
+ catch (Exception e)
|
|
|
+ {
|
|
|
+ opcodeBuffer = BitConverter.GetBytes((ushort)Opcode.RpcException);
|
|
|
+ BinaryFormatter formatter = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.All));
|
|
|
+ using (MemoryStream stream = new MemoryStream())
|
|
|
+ {
|
|
|
+ formatter.Serialize(stream, e);
|
|
|
+ channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, stream.ToArray() });
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// 消息回调或者超时回调
|
|
|
- public void RequestCallback(AChannel channel, int id, byte[] buffer, RpcResponseStatus responseStatus)
|
|
|
+ public void RpcCallback(AChannel channel, int id, byte[] buffer, RpcResponseStatus responseStatus)
|
|
|
{
|
|
|
Action<byte[], RpcResponseStatus> action;
|
|
|
if (!this.requestCallback.TryGetValue(id, out action))
|
|
|
@@ -168,22 +163,23 @@ namespace Model
|
|
|
/// <summary>
|
|
|
/// Rpc请求
|
|
|
/// </summary>
|
|
|
- public Task<T> RpcRequest<T, K>(string address, short type, K request, int waitTime = 0)
|
|
|
+ public Task<T> RpcCall<T, K>(string address, K request, int waitTime = 0)
|
|
|
{
|
|
|
AChannel channel = this.service.GetChannel(address);
|
|
|
|
|
|
++this.requestId;
|
|
|
byte[] requestBuffer = MongoHelper.ToBson(request);
|
|
|
- byte[] typeBuffer = BitConverter.GetBytes(type);
|
|
|
+ Opcode opcode = (Opcode)Enum.Parse(typeof(Opcode), request.GetType().Name);
|
|
|
+ byte[] opcodeBuffer = BitConverter.GetBytes((ushort)opcode);
|
|
|
byte[] idBuffer = BitConverter.GetBytes(this.requestId);
|
|
|
- channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, requestBuffer });
|
|
|
+ channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, requestBuffer });
|
|
|
var tcs = new TaskCompletionSource<T>();
|
|
|
this.requestCallback[this.requestId] = (messageBytes, status) =>
|
|
|
{
|
|
|
if (status == RpcResponseStatus.Timeout)
|
|
|
{
|
|
|
tcs.SetException(new Exception(
|
|
|
- string.Format("rpc timeout {0} {1}", type, MongoHelper.ToJson(request))));
|
|
|
+ string.Format("rpc timeout {0} {1}", opcode, MongoHelper.ToJson(request))));
|
|
|
return;
|
|
|
}
|
|
|
if (status == RpcResponseStatus.Exception)
|
|
|
@@ -207,35 +203,9 @@ namespace Model
|
|
|
if (waitTime > 0)
|
|
|
{
|
|
|
this.service.Timer.Add(TimeHelper.Now() + waitTime,
|
|
|
- () => { this.RequestCallback(channel, this.requestId, null, RpcResponseStatus.Timeout); });
|
|
|
+ () => { this.RpcCallback(channel, this.requestId, null, RpcResponseStatus.Timeout); });
|
|
|
}
|
|
|
return tcs.Task;
|
|
|
}
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// Rpc响应
|
|
|
- /// </summary>
|
|
|
- public void RpcResponse<T>(AChannel channel, int id, T response)
|
|
|
- {
|
|
|
- byte[] responseBuffer = MongoHelper.ToBson(response);
|
|
|
- byte[] typeBuffer = BitConverter.GetBytes(Opcode.RpcResponse);
|
|
|
- byte[] idBuffer = BitConverter.GetBytes(id);
|
|
|
- channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
|
|
|
- }
|
|
|
-
|
|
|
- /// <summary>
|
|
|
- /// Rpc响应
|
|
|
- /// </summary>
|
|
|
- public void RpcException(AChannel channel, int id, Exception e)
|
|
|
- {
|
|
|
- byte[] opcodeBuffer = BitConverter.GetBytes(Opcode.RpcException);
|
|
|
- byte[] idBuffer = BitConverter.GetBytes(id);
|
|
|
- BinaryFormatter formatter = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.All));
|
|
|
- using (MemoryStream stream = new MemoryStream())
|
|
|
- {
|
|
|
- formatter.Serialize(stream, e);
|
|
|
- channel.SendAsync(new List<byte[]> { opcodeBuffer, idBuffer, stream.ToArray() });
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
}
|