using System; using System.Collections.Generic; using System.Threading.Tasks; using Common.Base; using Common.Helper; using Common.Network; using TNet; using UNet; namespace Model { public class NetworkComponent: Component, IUpdate, IStart { private IService service; private int requestId; private readonly Dictionary> requestCallback = new Dictionary>(); private readonly Dictionary> cache = new Dictionary>(); private void Accept(string host, int port, NetworkProtocol protocol = NetworkProtocol.TCP) { switch (protocol) { case NetworkProtocol.TCP: this.service = new TService(host, port); break; case NetworkProtocol.UDP: this.service = new UService(host, port); break; default: throw new ArgumentOutOfRangeException("protocol"); } this.AcceptChannel(); } public void Start() { this.Accept(World.Instance.Options.Host, World.Instance.Options.Port, World.Instance.Options.Protocol); } public void Update() { this.service.Update(); } /// /// 接收连接 /// private async void AcceptChannel() { while (true) { AChannel channel = await this.service.GetChannel(); this.ProcessChannel(channel); } } /// /// 接收分发封包 /// /// private async void ProcessChannel(AChannel channel) { while (true) { byte[] message = await channel.RecvAsync(); Env env = new Env(); env[EnvKey.Channel] = channel; env[EnvKey.Message] = message; int opcode = BitConverter.ToUInt16(message, 0); // 这个区间表示消息是rpc响应消息 if (MessageTypeHelper.IsRpcResponseMessage(opcode)) { int id = BitConverter.ToInt32(message, 2); this.RequestCallback(channel, id, message, true); continue; } // 如果是发给client的消息,说明这是gate server,需要根据unitid查到channel,进行发送 if (MessageTypeHelper.IsServerMessage(opcode)) { World.Instance.GetComponent>().Run(EventType.GateRecvServerMessage, env); continue; } // 进行消息分发 World.Instance.GetComponent>().Run(EventType.LogicRecvMessage, env); } } public void SendAsync(string address, byte[] buffer) { AChannel channel = this.service.GetChannel(address); channel.SendAsync(buffer); } // 消息回调或者超时回调 public void RequestCallback(AChannel channel, int id, byte[] buffer, bool isOK) { Action action; if (this.requestCallback.TryGetValue(id, out action)) { action(buffer, isOK); } this.requestCallback.Remove(id); } /// /// Rpc请求 /// public Task Request(AChannel channel, short type, K request, int waitTime = 0) { ++this.requestId; byte[] requestBuffer = MongoHelper.ToBson(request); byte[] typeBuffer = BitConverter.GetBytes(type); byte[] idBuffer = BitConverter.GetBytes(this.requestId); channel.SendAsync(new List { typeBuffer, idBuffer, requestBuffer }); var tcs = new TaskCompletionSource(); this.requestCallback[this.requestId] = (e, b) => { if (b) { T response = MongoHelper.FromBson(e, 6); tcs.SetResult(response); } else { tcs.SetException(new Exception(string.Format("rpc timeout {0} {1}", type, MongoHelper.ToJson(request)))); } }; if (waitTime > 0) { this.service.Timer.Add(TimeHelper.Now() + waitTime, () => { this.RequestCallback(channel, this.requestId, null, false); }); } return tcs.Task; } /// /// Rpc响应 /// public void Response(AChannel channel, short type, int id, T response) { byte[] responseBuffer = MongoHelper.ToBson(response); byte[] typeBuffer = BitConverter.GetBytes(type); byte[] idBuffer = BitConverter.GetBytes(id); channel.SendAsync(new List { typeBuffer, idBuffer, responseBuffer }); } } }