| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.IO;
- using System.Net;
- using System.Net.Sockets;
- using System.Runtime.InteropServices;
- namespace ET
- {
- public struct KcpWaitPacket
- {
- public long ActorId;
- public MemoryStream MemoryStream;
- }
-
- public class KChannel : AChannel
- {
- public KService Service;
-
- // 保存所有的channel
- public static readonly Dictionary<uint, KChannel> kChannels = new Dictionary<uint, KChannel>();
-
- public static readonly ConcurrentDictionary<long, ulong> idLocalRemoteConn = new ConcurrentDictionary<long, ulong>();
-
- private Socket socket;
- private IntPtr kcp;
- private readonly Queue<KcpWaitPacket> sendBuffer = new Queue<KcpWaitPacket>();
- private uint lastRecvTime;
-
- public readonly uint CreateTime;
- public uint LocalConn { get; set; }
- public uint RemoteConn { get; set; }
- private readonly byte[] sendCache = new byte[1024 * 1024];
-
- public bool IsConnected { get; private set; }
- public string RealAddress { get; set; }
-
- private void InitKcp()
- {
- switch (this.Service.ServiceType)
- {
- case ServiceType.Inner:
- Kcp.KcpNodelay(kcp, 1, 10, 2, 1);
- Kcp.KcpWndsize(kcp, 1024 * 100, 1024 * 100);
- Kcp.KcpSetmtu(kcp, 1400); // 默认1400
- Kcp.KcpSetminrto(kcp, 10);
- break;
- case ServiceType.Outer:
- Kcp.KcpNodelay(kcp, 1, 10, 2, 1);
- Kcp.KcpWndsize(kcp, 128, 128);
- Kcp.KcpSetmtu(kcp, 470);
- Kcp.KcpSetminrto(kcp, 10);
- break;
- }
- }
-
- // connect
- public KChannel(long id, uint localConn, Socket socket, IPEndPoint remoteEndPoint, KService kService)
- {
- this.LocalConn = localConn;
- if (kChannels.ContainsKey(this.LocalConn))
- {
- throw new Exception($"channel create error: {this.LocalConn} {remoteEndPoint} {this.ChannelType}");
- }
- this.Id = id;
- this.ChannelType = ChannelType.Connect;
-
- Log.Info($"channel create: {this.Id} {this.LocalConn} {remoteEndPoint} {this.ChannelType}");
-
- this.Service = kService;
- this.RemoteAddress = remoteEndPoint;
- this.socket = socket;
- this.kcp = Kcp.KcpCreate(this.RemoteConn, (IntPtr) this.LocalConn);
- kChannels.Add(this.LocalConn, this);
-
- this.lastRecvTime = kService.TimeNow;
- this.CreateTime = kService.TimeNow;
- this.Connect();
- }
- // accept
- public KChannel(long id, uint localConn, uint remoteConn, Socket socket, IPEndPoint remoteEndPoint, KService kService)
- {
- if (kChannels.ContainsKey(this.LocalConn))
- {
- throw new Exception($"channel create error: {localConn} {remoteEndPoint} {this.ChannelType}");
- }
- this.Id = id;
- this.ChannelType = ChannelType.Accept;
-
- Log.Info($"channel create: {this.Id} {localConn} {remoteConn} {remoteEndPoint} {this.ChannelType}");
- this.Service = kService;
- this.LocalConn = localConn;
- this.RemoteConn = remoteConn;
- this.RemoteAddress = remoteEndPoint;
- this.socket = socket;
- this.kcp = Kcp.KcpCreate(this.RemoteConn, (IntPtr) localConn);
- kChannels.Add(this.LocalConn, this);
-
- this.lastRecvTime = kService.TimeNow;
- this.CreateTime = kService.TimeNow;
- this.InitKcp();
- }
-
- #region 网络线程
-
- public override void Dispose()
- {
- if (this.IsDisposed)
- {
- return;
- }
- uint localConn = this.LocalConn;
- uint remoteConn = this.RemoteConn;
- Log.Info($"channel dispose: {this.Id} {localConn} {remoteConn}");
-
- kChannels.Remove(localConn);
- idLocalRemoteConn.TryRemove(this.Id, out ulong _);
-
- long id = this.Id;
- this.Id = 0;
- this.Service.Remove(id);
- try
- {
- //this.Service.Disconnect(localConn, remoteConn, this.Error, this.RemoteAddress, 3);
- }
- catch (Exception e)
- {
- Log.Error(e);
- }
- if (this.kcp != IntPtr.Zero)
- {
- Kcp.KcpRelease(this.kcp);
- this.kcp = IntPtr.Zero;
- }
- this.socket = null;
- }
- public void HandleConnnect()
- {
- // 如果连接上了就不用处理了
- if (this.IsConnected)
- {
- return;
- }
- this.kcp = Kcp.KcpCreate(this.RemoteConn, new IntPtr(this.LocalConn));
- this.InitKcp();
- ulong localRmoteConn = ((ulong) this.RemoteConn << 32) | this.LocalConn;
- idLocalRemoteConn.TryAdd(this.Id, localRmoteConn);
- Log.Info($"channel connected: {this.Id} {this.LocalConn} {this.RemoteConn} {this.RemoteAddress}");
- this.IsConnected = true;
- this.lastRecvTime = this.Service.TimeNow;
-
- while (true)
- {
- if (this.sendBuffer.Count <= 0)
- {
- break;
- }
-
- KcpWaitPacket buffer = this.sendBuffer.Dequeue();
- this.KcpSend(buffer);
- }
- }
- /// <summary>
- /// 发送请求连接消息
- /// </summary>
- private void Connect()
- {
- try
- {
- uint timeNow = this.Service.TimeNow;
-
- this.lastRecvTime = timeNow;
-
- byte[] buffer = sendCache;
- buffer.WriteTo(0, KcpProtocalType.SYN);
- buffer.WriteTo(1, this.LocalConn);
- buffer.WriteTo(5, this.RemoteConn);
- this.socket.SendTo(buffer, 0, 9, SocketFlags.None, this.RemoteAddress);
- Log.Info($"kchannel connect {this.Id} {this.LocalConn} {this.RemoteConn} {this.RealAddress} {this.socket.LocalEndPoint}");
- // 200毫秒后再次update发送connect请求
- this.Service.AddToUpdateNextTime(timeNow + 300, this.Id);
- }
- catch (Exception e)
- {
- Log.Error(e);
- this.OnError(ErrorCode.ERR_SocketCantSend);
- }
- }
- public void Update()
- {
- if (this.IsDisposed)
- {
- return;
- }
- uint timeNow = this.Service.TimeNow;
-
- // 如果还没连接上,发送连接请求
- if (!this.IsConnected)
- {
- // 20秒没连接上则报错
- if (timeNow - this.CreateTime > 10 * 1000)
- {
- Log.Error($"kChannel connect timeout: {this.Id} {this.RemoteConn} {timeNow} {this.CreateTime} {this.ChannelType} {this.RemoteAddress}");
- this.OnError(ErrorCode.ERR_KcpConnectTimeout);
- return;
- }
- switch (ChannelType)
- {
- case ChannelType.Connect:
- this.Connect();
- break;
- }
- return;
- }
- try
- {
- Kcp.KcpUpdate(this.kcp, timeNow);
- }
- catch (Exception e)
- {
- Log.Error(e);
- this.OnError(ErrorCode.ERR_SocketError);
- return;
- }
- if (this.kcp != IntPtr.Zero)
- {
- uint nextUpdateTime = Kcp.KcpCheck(this.kcp, timeNow);
- this.Service.AddToUpdateNextTime(nextUpdateTime, this.Id);
- }
- }
- public void HandleRecv(byte[] date, int offset, int length)
- {
- if (this.IsDisposed)
- {
- return;
- }
- this.IsConnected = true;
-
- Kcp.KcpInput(this.kcp, date, offset, length);
- this.Service.AddToUpdateNextTime(0, this.Id);
- while (true)
- {
- if (this.IsDisposed)
- {
- break;
- }
- int n = Kcp.KcpPeeksize(this.kcp);
- if (n < 0)
- {
- break;
- }
- if (n == 0)
- {
- this.OnError((int)SocketError.NetworkReset);
- break;
- }
- MemoryStream ms = MessageSerializeHelper.GetStream(n);
-
- ms.SetLength(n);
- ms.Seek(0, SeekOrigin.Begin);
- byte[] buffer = ms.GetBuffer();
- int count = Kcp.KcpRecv(this.kcp, buffer, n);
- if (n != count)
- {
- break;
- }
- switch (this.Service.ServiceType)
- {
- case ServiceType.Inner:
- ms.Seek(Packet.ActorIdLength + Packet.OpcodeLength, SeekOrigin.Begin);
- break;
- case ServiceType.Outer:
- ms.Seek(Packet.OpcodeLength, SeekOrigin.Begin);
- break;
- }
- this.lastRecvTime = this.Service.TimeNow;
- this.OnRead(ms);
- }
- }
- public void Output(IntPtr bytes, int count)
- {
- if (this.IsDisposed)
- {
- return;
- }
- try
- {
- // 没连接上 kcp不往外发消息, 其实本来没连接上不会调用update,这里只是做一层保护
- if (!this.IsConnected)
- {
- return;
- }
-
- if (count == 0)
- {
- Log.Error($"output 0");
- return;
- }
- byte[] buffer = this.sendCache;
- buffer.WriteTo(0, KcpProtocalType.MSG);
- // 每个消息头部写下该channel的id;
- buffer.WriteTo(1, this.LocalConn);
- Marshal.Copy(bytes, buffer, 5, count);
- this.socket.SendTo(buffer, 0, count + 5, SocketFlags.None, this.RemoteAddress);
- }
- catch (Exception e)
- {
- Log.Error(e);
- this.OnError(ErrorCode.ERR_SocketCantSend);
- }
- }
- private void KcpSend(KcpWaitPacket kcpWaitPacket)
- {
- if (this.IsDisposed)
- {
- return;
- }
- MemoryStream memoryStream = kcpWaitPacket.MemoryStream;
- if (this.Service.ServiceType == ServiceType.Inner)
- {
- memoryStream.GetBuffer().WriteTo(0, kcpWaitPacket.ActorId);
- }
- int count = (int) (memoryStream.Length - memoryStream.Position);
- Kcp.KcpSend(this.kcp, memoryStream.GetBuffer(), (int)memoryStream.Position, count);
- this.Service.AddToUpdateNextTime(0, this.Id);
- }
-
- public void Send(long actorId, MemoryStream stream)
- {
- if (this.kcp != IntPtr.Zero)
- {
- // 检查等待发送的消息,如果超出最大等待大小,应该断开连接
- int n = Kcp.KcpWaitsnd(this.kcp);
- int maxWaitSize = 0;
- switch (this.Service.ServiceType)
- {
- case ServiceType.Inner:
- maxWaitSize = Kcp.InnerMaxWaitSize;
- break;
- case ServiceType.Outer:
- maxWaitSize = Kcp.OuterMaxWaitSize;
- break;
- default:
- throw new ArgumentOutOfRangeException();
- }
-
- if (n > maxWaitSize)
- {
- Log.Error($"kcp wait snd too large: {n}: {this.Id} {this.RemoteConn}");
- this.OnError(ErrorCode.ERR_KcpWaitSendSizeTooLarge);
- return;
- }
- }
- KcpWaitPacket kcpWaitPacket = new KcpWaitPacket() { ActorId = actorId, MemoryStream = stream };
- if (!this.IsConnected)
- {
- this.sendBuffer.Enqueue(kcpWaitPacket);
- return;
- }
- this.KcpSend(kcpWaitPacket);
- }
-
- private void OnRead(MemoryStream memoryStream)
- {
- this.Service.OnRead(this.Id, memoryStream);
- }
-
- public void OnError(int error)
- {
- long channelId = this.Id;
- this.Service.Remove(channelId);
- this.Service.OnError(channelId, error);
- }
-
- #endregion
- }
- }
|