using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; namespace Base { public enum NetChannelType { Login, Gate, Battle, } [ObjectEvent] public class MessageComponentEvent : ObjectEvent, ILoader, IAwake, IUpdate { public void Load() { this.GetValue().Load(); } public void Awake(SceneType sceneType) { this.GetValue().Awake(sceneType); } public void Update() { this.GetValue().Update(); } } /// /// 消息分发组件 /// public class MessageComponent: Component { private SceneType SceneType; private uint RpcId { get; set; } private Dictionary>> events; private readonly Dictionary> requestCallback = new Dictionary>(); private readonly Dictionary> waitCallback = new Dictionary>(); private readonly Dictionary channels = new Dictionary(); public void Awake(SceneType sceneType) { this.SceneType = sceneType; this.Load(); } public void Load() { this.events = new Dictionary>>(); Assembly[] assemblies = Object.ObjectManager.GetAssemblies(); foreach (Assembly assembly in assemblies) { Type[] types = assembly.GetTypes(); foreach (Type type in types) { object[] attrs = type.GetCustomAttributes(typeof(MessageAttribute), false); if (attrs.Length == 0) { continue; } MessageAttribute messageAttribute = (MessageAttribute)attrs[0]; if (messageAttribute.SceneType != this.SceneType) { continue; } object obj = Activator.CreateInstance(type); IMRegister iMRegister = obj as IMRegister; if (iMRegister == null) { throw new GameException($"message handler not inherit IEventSync or IEventAsync interface: {obj.GetType().FullName}"); } iMRegister.Register(this); } } } public void Register(Action action) { Opcode opcode = EnumHelper.FromString(typeof (T).Name); if (!this.events.ContainsKey(opcode)) { this.events.Add(opcode, new List>()); } List> actions = this.events[opcode]; actions.Add((messageBytes, offset, count) => { T t; try { t = MongoHelper.FromBson(messageBytes, offset, count); } catch (Exception ex) { throw new GameException("解释消息失败:" + opcode, ex); } if (OpcodeHelper.IsNeedDebugLogMessage(opcode)) { Log.Debug(MongoHelper.ToJson(t)); } action(this.Owner, t); }); } public void Connect(NetChannelType channelType, string host, int port) { AChannel channel = Share.Scene.GetComponent().ConnectChannel(host, port); this.channels[channelType] = channel; } public void Close(NetChannelType channelType) { AChannel channel = this.GetChannel(channelType); if (channel == null || channel.Id == 0) { return; } this.channels.Remove(channelType); channel.Dispose(); } public void Update() { foreach (AChannel channel in this.channels.Values.ToArray()) { this.UpdateChannel(channel); } } private void UpdateChannel(AChannel channel) { if (channel.Id == 0) { return; } while (true) { byte[] messageBytes = channel.Recv(); if (messageBytes == null) { return; } if (messageBytes.Length < 6) { continue; } Opcode opcode = (Opcode)BitConverter.ToUInt16(messageBytes, 0); try { this.Run(opcode, messageBytes); } catch (Exception e) { Log.Error(e.ToString()); } } } public void Run(Opcode opcode, byte[] messageBytes) { int offset = 0; uint flagUInt = BitConverter.ToUInt32(messageBytes, 2); bool isCompressed = (byte)(flagUInt >> 24) == 1; if (isCompressed) // 表示有压缩,需要解压缩 { messageBytes = ZipHelper.Decompress(messageBytes, 6, messageBytes.Length - 6); offset = 0; } else { offset = 6; } uint rpcId = flagUInt & 0x0fff; this.RunDecompressedBytes(opcode, rpcId, messageBytes, offset); } public void RunDecompressedBytes(Opcode opcode, uint rpcId, byte[] messageBytes, int offset) { Action action; if (this.requestCallback.TryGetValue(rpcId, out action)) { this.requestCallback.Remove(rpcId); action(messageBytes, offset, messageBytes.Length - offset); return; } if (this.waitCallback.TryGetValue(opcode, out action)) { this.waitCallback.Remove(opcode); action(messageBytes, offset, messageBytes.Length - offset); return; } List> actions; if (!this.events.TryGetValue(opcode, out actions)) { if (this.SceneType == SceneType.Game) { Log.Error($"消息{opcode}没有处理"); } return; } foreach (var ev in actions) { try { ev(messageBytes, offset, messageBytes.Length - offset); } catch (Exception e) { Log.Error(e.ToString()); } } } public Task CallAsync(object request, CancellationToken cancellationToken) where Response : IErrorMessage { this.Send(request, ++this.RpcId); var tcs = new TaskCompletionSource(); this.requestCallback[this.RpcId] = (bytes, offset, count) => { try { Response response = MongoHelper.FromBson(bytes, offset, count); Opcode opcode = EnumHelper.FromString(response.GetType().Name); if (OpcodeHelper.IsNeedDebugLogMessage(opcode)) { Log.Debug(MongoHelper.ToJson(response)); } if (response.ErrorMessage.errno != (int)ErrorCode.ERR_Success) { tcs.SetException(new RpcException((ErrorCode)response.ErrorMessage.errno, response.ErrorMessage.msg.Utf8ToStr())); return; } tcs.SetResult(response); } catch (Exception e) { tcs.SetException(new GameException($"Rpc Error: {typeof(Response).FullName}", e)); } }; cancellationToken.Register(() => { this.requestCallback.Remove(this.RpcId); }); return tcs.Task; } /// /// Rpc调用,发送一个消息,等待返回一个消息 /// /// /// /// public Task CallAsync(object request) where Response : IErrorMessage { this.Send(request, ++this.RpcId); var tcs = new TaskCompletionSource(); this.requestCallback[this.RpcId] = (bytes, offset, count) => { try { Response response = MongoHelper.FromBson(bytes, offset, count); Opcode opcode = EnumHelper.FromString(response.GetType().Name); if (OpcodeHelper.IsNeedDebugLogMessage(opcode)) { Log.Debug(MongoHelper.ToJson(response)); } if (response.ErrorMessage.errno != (int) ErrorCode.ERR_Success) { tcs.SetException(new RpcException((ErrorCode)response.ErrorMessage.errno, response.ErrorMessage.msg.Utf8ToStr())); return; } tcs.SetResult(response); } catch (Exception e) { tcs.SetException(new GameException($"Rpc Error: {typeof(Response).FullName}", e)); } }; return tcs.Task; } /// /// 不发送消息,直接等待返回一个消息 /// /// /// /// public Task WaitAsync(CancellationToken cancellationToken) where Response : class { var tcs = new TaskCompletionSource(); Opcode opcode = EnumHelper.FromString(typeof(Response).Name); this.waitCallback[opcode] = (bytes, offset, count) => { try { Response response = MongoHelper.FromBson(bytes, offset, count); Opcode op = EnumHelper.FromString(response.GetType().Name); if (OpcodeHelper.IsNeedDebugLogMessage(op)) { Log.Debug(MongoHelper.ToJson(response)); } tcs.SetResult(response); } catch (Exception e) { tcs.SetException(new GameException($"Wait Error: {typeof(Response).FullName}", e)); } }; cancellationToken.Register(() => { this.waitCallback.Remove(opcode); }); return tcs.Task; } /// /// 不发送消息,直接等待返回一个消息 /// /// /// public Task WaitAsync() where Response : class { var tcs = new TaskCompletionSource(); Opcode opcode = EnumHelper.FromString(typeof(Response).Name); this.waitCallback[opcode] = (bytes, offset, count) => { try { Response response = MongoHelper.FromBson(bytes, offset, count); Opcode op = EnumHelper.FromString(response.GetType().Name); if (OpcodeHelper.IsNeedDebugLogMessage(op)) { Log.Debug(MongoHelper.ToJson(response)); } tcs.SetResult(response); } catch (Exception e) { tcs.SetException(new GameException($"Wait Error: {typeof(Response).FullName}", e)); } }; return tcs.Task; } public AChannel GetChannel(NetChannelType channelType) { AChannel channel; this.channels.TryGetValue(channelType, out channel); return channel; } public void Send(object message) { this.Send(message, 0); } public bool IsChannelConnected(NetChannelType channelType) { AChannel channel = GetChannel(channelType); if (channel == null) { return false; } return true; } private void Send(object message, uint rpcId) { Opcode opcode = EnumHelper.FromString(message.GetType().Name); byte[] opcodeBytes = BitConverter.GetBytes((ushort)opcode); byte[] seqBytes = BitConverter.GetBytes(rpcId); byte[] messageBytes = MongoHelper.ToBson(message); NetChannelType channelType; if ((ushort)opcode > 7000 && (ushort)opcode < 8000) { channelType = NetChannelType.Login; } else if ((ushort)opcode > 0 && (ushort)opcode <= 1000) { channelType = NetChannelType.Battle; } else { channelType = NetChannelType.Gate; } AChannel channel = this.GetChannel(channelType); if (channel == null) { throw new GameException("game channel not found!"); } channel.Send(new List { opcodeBytes, seqBytes, messageBytes }); if (OpcodeHelper.IsNeedDebugLogMessage(opcode)) { Log.Debug(MongoHelper.ToJson(message)); } } public override void Dispose() { if (this.Id == 0) { return; } base.Dispose(); foreach (AChannel channel in this.channels.Values.ToArray()) { channel.Dispose(); } } } }