|
|
@@ -1,6 +1,10 @@
|
|
|
using System;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Threading.Tasks;
|
|
|
using Common.Base;
|
|
|
+using Common.Helper;
|
|
|
using Common.Network;
|
|
|
+using MongoDB.Bson;
|
|
|
using TNet;
|
|
|
using UNet;
|
|
|
|
|
|
@@ -10,6 +14,10 @@ namespace Model
|
|
|
{
|
|
|
private IService service;
|
|
|
|
|
|
+ private int requestId;
|
|
|
+
|
|
|
+ private readonly Dictionary<int, Action<byte[], bool>> requestCallback = new Dictionary<int, Action<byte[], bool>>();
|
|
|
+
|
|
|
private void Accept(string host, int port, NetworkProtocol protocol = NetworkProtocol.TCP)
|
|
|
{
|
|
|
switch (protocol)
|
|
|
@@ -46,7 +54,7 @@ namespace Model
|
|
|
while (true)
|
|
|
{
|
|
|
AChannel channel = await this.service.GetChannel();
|
|
|
- ProcessChannel(channel);
|
|
|
+ this.ProcessChannel(channel);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -54,7 +62,7 @@ namespace Model
|
|
|
/// 接收分发封包
|
|
|
/// </summary>
|
|
|
/// <param name="channel"></param>
|
|
|
- private static async void ProcessChannel(AChannel channel)
|
|
|
+ private async void ProcessChannel(AChannel channel)
|
|
|
{
|
|
|
while (true)
|
|
|
{
|
|
|
@@ -63,20 +71,79 @@ namespace Model
|
|
|
env[EnvKey.Channel] = channel;
|
|
|
env[EnvKey.Message] = message;
|
|
|
int opcode = BitConverter.ToUInt16(message, 0);
|
|
|
+
|
|
|
+ if (MessageTypeHelper.IsClientMessage(opcode))
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
// 这个区间表示消息是rpc响应消息
|
|
|
- if (opcode >= 40000 && opcode < 50000)
|
|
|
+ if (MessageTypeHelper.IsRpcResponseMessage(opcode))
|
|
|
{
|
|
|
int id = BitConverter.ToInt32(message, 2);
|
|
|
- channel.RequestCallback(id, message, true);
|
|
|
+ this.RequestCallback(channel, id, message, true);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- // 进行消息解析分发
|
|
|
-#pragma warning disable 4014
|
|
|
- World.Instance.GetComponent<EventComponent<EventAttribute>>()
|
|
|
- .RunAsync(EventType.Message, env);
|
|
|
-#pragma warning restore 4014
|
|
|
+ // 进行消息分发
|
|
|
+ World.Instance.GetComponent<EventComponent<MessageAttribute>>().RunAsync(opcode, env);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 消息回调或者超时回调
|
|
|
+ public void RequestCallback(AChannel channel, int id, byte[] buffer, bool isOK)
|
|
|
+ {
|
|
|
+ Action<byte[], bool> action;
|
|
|
+ if (this.requestCallback.TryGetValue(id, out action))
|
|
|
+ {
|
|
|
+ action(buffer, isOK);
|
|
|
}
|
|
|
+ this.requestCallback.Remove(id);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Rpc请求
|
|
|
+ /// </summary>
|
|
|
+ public Task<T> Request<T, K>(AChannel channel, short type, K request, int waitTime = 0)
|
|
|
+ {
|
|
|
+ ++this.requestId;
|
|
|
+ byte[] requestBuffer = MongoHelper.ToBson(request);
|
|
|
+ byte[] typeBuffer = BitConverter.GetBytes(type);
|
|
|
+ byte[] idBuffer = BitConverter.GetBytes(this.requestId);
|
|
|
+ channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, requestBuffer });
|
|
|
+ var tcs = new TaskCompletionSource<T>();
|
|
|
+ this.requestCallback[this.requestId] = (e, b) =>
|
|
|
+ {
|
|
|
+ if (b)
|
|
|
+ {
|
|
|
+ T response = MongoHelper.FromBson<T>(e, 6);
|
|
|
+ tcs.SetResult(response);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ tcs.SetException(new Exception(string.Format("rpc timeout {0} {1}", type, MongoHelper.ToJson(request))));
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ if (waitTime > 0)
|
|
|
+ {
|
|
|
+ this.service.Timer.Add(TimeHelper.Now() + waitTime, () =>
|
|
|
+ {
|
|
|
+ this.RequestCallback(channel, this.requestId, null, false);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return tcs.Task;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Rpc响应
|
|
|
+ /// </summary>
|
|
|
+ public void Response<T>(AChannel channel, short type, int id, T response)
|
|
|
+ {
|
|
|
+ byte[] responseBuffer = MongoHelper.ToBson(response);
|
|
|
+ byte[] typeBuffer = BitConverter.GetBytes(type);
|
|
|
+ byte[] idBuffer = BitConverter.GetBytes(id);
|
|
|
+ channel.SendAsync(new List<byte[]> { typeBuffer, idBuffer, responseBuffer });
|
|
|
}
|
|
|
}
|
|
|
}
|