|
|
@@ -9,17 +9,23 @@ using UNet;
|
|
|
|
|
|
namespace Model
|
|
|
{
|
|
|
- public enum RpcStatus
|
|
|
+ public enum RpcResponseStatus
|
|
|
{
|
|
|
- OK,
|
|
|
+ Succee,
|
|
|
Timeout,
|
|
|
Exception,
|
|
|
}
|
|
|
|
|
|
public class RpcExcetionInfo
|
|
|
{
|
|
|
- public int ErrorCode { get; set; }
|
|
|
- public string ErrorInfo { get; set; }
|
|
|
+ public int ErrorCode { get; private set; }
|
|
|
+ public string ErrorInfo { get; private set; }
|
|
|
+
|
|
|
+ public RpcExcetionInfo(int errorCode, string errorInfo)
|
|
|
+ {
|
|
|
+ this.ErrorCode = errorCode;
|
|
|
+ this.ErrorInfo = errorInfo;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public class NetworkComponent: Component<World>, IUpdate, IStart
|
|
|
@@ -28,8 +34,8 @@ namespace Model
|
|
|
|
|
|
private int requestId;
|
|
|
|
|
|
- private readonly Dictionary<int, Action<byte[], RpcStatus>> requestCallback =
|
|
|
- new Dictionary<int, Action<byte[], RpcStatus>>();
|
|
|
+ private readonly Dictionary<int, Action<byte[], RpcResponseStatus>> requestCallback =
|
|
|
+ new Dictionary<int, Action<byte[], RpcResponseStatus>>();
|
|
|
|
|
|
private void Accept(string host, int port, NetworkProtocol protocol = NetworkProtocol.TCP)
|
|
|
{
|
|
|
@@ -83,27 +89,46 @@ namespace Model
|
|
|
Env env = new Env();
|
|
|
env[EnvKey.Channel] = channel;
|
|
|
env[EnvKey.Message] = message;
|
|
|
- int opcode = BitConverter.ToUInt16(message, 0);
|
|
|
+ ushort opcode = BitConverter.ToUInt16(message, 0);
|
|
|
+ env[EnvKey.Opcode] = opcode;
|
|
|
|
|
|
// 表示消息是rpc响应消息
|
|
|
- if (opcode == 0)
|
|
|
+ if (opcode == Opcode.RpcResponse)
|
|
|
{
|
|
|
int id = BitConverter.ToInt32(message, 2);
|
|
|
- this.RequestCallback(channel, id, message, RpcStatus.OK);
|
|
|
+ this.RequestCallback(channel, id, message, RpcResponseStatus.Succee);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- // 如果是发给client的消息,说明这是gate server,需要根据unitid查到channel,进行发送
|
|
|
+ // rpc异常
|
|
|
+ if (opcode == Opcode.RpcException)
|
|
|
+ {
|
|
|
+ int id = BitConverter.ToInt32(message, 2);
|
|
|
+ this.RequestCallback(channel, id, message, RpcResponseStatus.Exception);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果是server message(发给client的消息),说明这是gate server,需要根据unitid查到channel,进行发送
|
|
|
if (MessageTypeHelper.IsServerMessage(opcode))
|
|
|
{
|
|
|
World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
- .Run(EventType.GateRecvServerMessage, env);
|
|
|
+ .RunAsync(EventType.GateRecvServerMessage, env);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
// 进行消息分发
|
|
|
- World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
- .Run(EventType.LogicRecvMessage, env);
|
|
|
+ if (MessageTypeHelper.IsClientMessage(opcode))
|
|
|
+ {
|
|
|
+ World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
+ .RunAsync(EventType.LogicRecvClientMessage, env);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (MessageTypeHelper.IsRpcRequestMessage(opcode))
|
|
|
+ {
|
|
|
+ World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
+ .RunAsync(EventType.LogicRecvRpcMessage, env);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -113,23 +138,31 @@ namespace Model
|
|
|
channel.SendAsync(buffer);
|
|
|
}
|
|
|
|
|
|
+ public void SendAsync(string address, List<byte[]> buffers)
|
|
|
+ {
|
|
|
+ AChannel channel = this.service.GetChannel(address);
|
|
|
+ channel.SendAsync(buffers);
|
|
|
+ }
|
|
|
+
|
|
|
// 消息回调或者超时回调
|
|
|
- public void RequestCallback(AChannel channel, int id, byte[] buffer, RpcStatus status)
|
|
|
+ public void RequestCallback(AChannel channel, int id, byte[] buffer, RpcResponseStatus responseStatus)
|
|
|
{
|
|
|
- Action<byte[], RpcStatus> action;
|
|
|
+ Action<byte[], RpcResponseStatus> action;
|
|
|
if (!this.requestCallback.TryGetValue(id, out action))
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
- action(buffer, status);
|
|
|
this.requestCallback.Remove(id);
|
|
|
+ action(buffer, responseStatus);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Rpc请求
|
|
|
/// </summary>
|
|
|
- public Task<T> Request<T, K>(AChannel channel, short type, K request, int waitTime = 0)
|
|
|
+ public Task<T> RpcRequest<T, K>(string address, short type, K request, int waitTime = 0)
|
|
|
{
|
|
|
+ AChannel channel = this.service.GetChannel(address);
|
|
|
+
|
|
|
++this.requestId;
|
|
|
byte[] requestBuffer = MongoHelper.ToBson(request);
|
|
|
byte[] typeBuffer = BitConverter.GetBytes(type);
|
|
|
@@ -138,13 +171,13 @@ namespace Model
|
|
|
var tcs = new TaskCompletionSource<T>();
|
|
|
this.requestCallback[this.requestId] = (e, b) =>
|
|
|
{
|
|
|
- if (b == RpcStatus.Timeout)
|
|
|
+ if (b == RpcResponseStatus.Timeout)
|
|
|
{
|
|
|
tcs.SetException(new Exception(
|
|
|
string.Format("rpc timeout {0} {1}", type, MongoHelper.ToJson(request))));
|
|
|
return;
|
|
|
}
|
|
|
- if (b == RpcStatus.Exception)
|
|
|
+ if (b == RpcResponseStatus.Exception)
|
|
|
{
|
|
|
RpcExcetionInfo errorInfo = MongoHelper.FromBson<RpcExcetionInfo>(e, 8);
|
|
|
tcs.SetException(new Exception(
|
|
|
@@ -152,7 +185,7 @@ namespace Model
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // RpcStatus.OK
|
|
|
+ // RpcResponseStatus.Succee
|
|
|
T response = MongoHelper.FromBson<T>(e, 6);
|
|
|
tcs.SetResult(response);
|
|
|
};
|
|
|
@@ -160,7 +193,7 @@ namespace Model
|
|
|
if (waitTime > 0)
|
|
|
{
|
|
|
this.service.Timer.Add(TimeHelper.Now() + waitTime,
|
|
|
- () => { this.RequestCallback(channel, this.requestId, null, RpcStatus.Timeout); });
|
|
|
+ () => { this.RequestCallback(channel, this.requestId, null, RpcResponseStatus.Timeout); });
|
|
|
}
|
|
|
return tcs.Task;
|
|
|
}
|
|
|
@@ -168,10 +201,10 @@ namespace Model
|
|
|
/// <summary>
|
|
|
/// Rpc响应
|
|
|
/// </summary>
|
|
|
- public void Response<T>(AChannel channel, int id, T response)
|
|
|
+ public void RpcResponse<T>(AChannel channel, int id, T response)
|
|
|
{
|
|
|
byte[] responseBuffer = MongoHelper.ToBson(response);
|
|
|
- byte[] typeBuffer = BitConverter.GetBytes(0);
|
|
|
+ byte[] typeBuffer = BitConverter.GetBytes(Opcode.RpcResponse);
|
|
|
byte[] idBuffer = BitConverter.GetBytes(id);
|
|
|
channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
|
|
|
}
|
|
|
@@ -179,11 +212,11 @@ namespace Model
|
|
|
/// <summary>
|
|
|
/// Rpc响应
|
|
|
/// </summary>
|
|
|
- public void ResponseException(AChannel channel, int id, int errorCode, string errorInfo)
|
|
|
+ public void RpcException(AChannel channel, int id, int errorCode, string errorInfo)
|
|
|
{
|
|
|
- byte[] typeBuffer = BitConverter.GetBytes(0);
|
|
|
+ byte[] typeBuffer = BitConverter.GetBytes(Opcode.RpcException);
|
|
|
byte[] idBuffer = BitConverter.GetBytes(id);
|
|
|
- RpcExcetionInfo info = new RpcExcetionInfo { ErrorCode = errorCode, ErrorInfo = errorInfo };
|
|
|
+ RpcExcetionInfo info = new RpcExcetionInfo(errorCode, errorInfo);
|
|
|
byte[] responseBuffer = MongoHelper.ToBson(info);
|
|
|
channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
|
|
|
}
|