| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- using System;
- using System.IO;
- using System.Net;
- using System.Net.Sockets;
- namespace ET
- {
- /// <summary>
- /// 封装Socket,将回调push到主线程处理
- /// </summary>
- public sealed class TChannel : AChannel
- {
- private readonly TService Service;
- private Socket socket;
- private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
- private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
- private readonly CircularBuffer recvBuffer = new CircularBuffer();
- private readonly CircularBuffer sendBuffer = new CircularBuffer();
- private bool isSending;
- private bool isConnected;
- private readonly PacketParser parser;
- private readonly byte[] sendCache = new byte[Packet.OpcodeLength + Packet.ActorIdLength];
- private void OnComplete(object sender, SocketAsyncEventArgs e)
- {
- switch (e.LastOperation)
- {
- case SocketAsyncOperation.Connect:
- ThreadSynchronizationContext.Instance.Post(() => OnConnectComplete(e));
- break;
- case SocketAsyncOperation.Receive:
- ThreadSynchronizationContext.Instance.Post(() => OnRecvComplete(e));
- break;
- case SocketAsyncOperation.Send:
- ThreadSynchronizationContext.Instance.Post(() => OnSendComplete(e));
- break;
- case SocketAsyncOperation.Disconnect:
- ThreadSynchronizationContext.Instance.Post(() => OnDisconnectComplete(e));
- break;
- default:
- throw new Exception($"socket error: {e.LastOperation}");
- }
- }
- #region 网络线程
- public TChannel(long id, IPEndPoint ipEndPoint, TService service)
- {
- this.ChannelType = ChannelType.Connect;
- this.Id = id;
- this.Service = service;
- this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- this.socket.NoDelay = true;
- this.parser = new PacketParser(this.recvBuffer, this.Service);
- this.innArgs.Completed += this.OnComplete;
- this.outArgs.Completed += this.OnComplete;
- this.RemoteAddress = ipEndPoint;
- this.isConnected = false;
- this.isSending = false;
- ThreadSynchronizationContext.Instance.PostNext(this.ConnectAsync);
- }
- public TChannel(long id, Socket socket, TService service)
- {
- this.ChannelType = ChannelType.Accept;
- this.Id = id;
- this.Service = service;
- this.socket = socket;
- this.socket.NoDelay = true;
- this.parser = new PacketParser(this.recvBuffer, this.Service);
- this.innArgs.Completed += this.OnComplete;
- this.outArgs.Completed += this.OnComplete;
- this.RemoteAddress = (IPEndPoint)socket.RemoteEndPoint;
- this.isConnected = true;
- this.isSending = false;
- // 下一帧再开始读写
- ThreadSynchronizationContext.Instance.PostNext(() =>
- {
- this.StartRecv();
- this.StartSend();
- });
- }
- public override void Dispose()
- {
- if (this.IsDisposed)
- {
- return;
- }
- Log.Info($"channel dispose: {this.Id} {this.RemoteAddress}");
- long id = this.Id;
- this.Id = 0;
- this.Service.Remove(id);
- this.socket.Close();
- this.innArgs.Dispose();
- this.outArgs.Dispose();
- this.innArgs = null;
- this.outArgs = null;
- this.socket = null;
- }
- public void Send(long actorId, MemoryStream stream)
- {
- if (this.IsDisposed)
- {
- throw new Exception("TChannel已经被Dispose, 不能发送消息");
- }
- switch (this.Service.ServiceType)
- {
- case ServiceType.Inner:
- {
- int messageSize = (int)(stream.Length - stream.Position);
- if (messageSize > ushort.MaxValue * 16)
- {
- throw new Exception($"send packet too large: {stream.Length} {stream.Position}");
- }
- this.sendCache.WriteTo(0, messageSize);
- this.sendBuffer.Write(this.sendCache, 0, PacketParser.InnerPacketSizeLength);
- // actorId
- stream.GetBuffer().WriteTo(0, actorId);
- this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position,
- (int)(stream.Length - stream.Position));
- break;
- }
- case ServiceType.Outer:
- {
- stream.Seek(Packet.ActorIdLength, SeekOrigin.Begin); // 外网不需要actorId
- ushort messageSize = (ushort)(stream.Length - stream.Position);
- this.sendCache.WriteTo(0, messageSize);
- this.sendBuffer.Write(this.sendCache, 0, PacketParser.OuterPacketSizeLength);
- this.sendBuffer.Write(stream.GetBuffer(), (int)stream.Position,
- (int)(stream.Length - stream.Position));
- break;
- }
- }
- if (!this.isSending)
- {
- //this.StartSend();
- this.Service.NeedStartSend.Add(this.Id);
- }
- }
- private void ConnectAsync()
- {
- this.outArgs.RemoteEndPoint = this.RemoteAddress;
- if (this.socket.ConnectAsync(this.outArgs))
- {
- return;
- }
- OnConnectComplete(this.outArgs);
- }
- private void OnConnectComplete(object o)
- {
- if (this.socket == null)
- {
- return;
- }
- SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
- if (e.SocketError != SocketError.Success)
- {
- this.OnError((int)e.SocketError);
- return;
- }
- e.RemoteEndPoint = null;
- this.isConnected = true;
- this.StartRecv();
- this.StartSend();
- }
- private void OnDisconnectComplete(object o)
- {
- SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
- this.OnError((int)e.SocketError);
- }
- private void StartRecv()
- {
- while (true)
- {
- try
- {
- if (this.socket == null)
- {
- return;
- }
- int size = this.recvBuffer.ChunkSize - this.recvBuffer.LastIndex;
- this.innArgs.SetBuffer(this.recvBuffer.Last, this.recvBuffer.LastIndex, size);
- }
- catch (Exception e)
- {
- Log.Error($"tchannel error: {this.Id}\n{e}");
- this.OnError(ErrorCore.ERR_TChannelRecvError);
- return;
- }
- if (this.socket.ReceiveAsync(this.innArgs))
- {
- return;
- }
- this.HandleRecv(this.innArgs);
- }
- }
- private void OnRecvComplete(object o)
- {
- this.HandleRecv(o);
- if (this.socket == null)
- {
- return;
- }
- this.StartRecv();
- }
- private void HandleRecv(object o)
- {
- if (this.socket == null)
- {
- return;
- }
- SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
- if (e.SocketError != SocketError.Success)
- {
- this.OnError((int)e.SocketError);
- return;
- }
- if (e.BytesTransferred == 0)
- {
- this.OnError(ErrorCore.ERR_PeerDisconnect);
- return;
- }
- this.recvBuffer.LastIndex += e.BytesTransferred;
- if (this.recvBuffer.LastIndex == this.recvBuffer.ChunkSize)
- {
- this.recvBuffer.AddLast();
- this.recvBuffer.LastIndex = 0;
- }
- // 收到消息回调
- while (true)
- {
- // 这里循环解析消息执行,有可能,执行消息的过程中断开了session
- if (this.socket == null)
- {
- return;
- }
- try
- {
- bool ret = this.parser.Parse(out MemoryStream memoryBuffer);
- if (!ret)
- {
- break;
- }
- this.OnRead(memoryBuffer);
- }
- catch (Exception ee)
- {
- Log.DetectionError($"ip: {this.RemoteAddress} {ee}");
- this.OnError(ErrorCore.ERR_SocketError);
- return;
- }
- }
- }
- public void Update()
- {
- this.StartSend();
- }
- private void StartSend()
- {
- if (!this.isConnected)
- {
- return;
- }
- if (this.isSending)
- {
- return;
- }
- while (true)
- {
- try
- {
- if (this.socket == null)
- {
- this.isSending = false;
- return;
- }
- // 没有数据需要发送
- if (this.sendBuffer.Length == 0)
- {
- this.isSending = false;
- return;
- }
- this.isSending = true;
- int sendSize = this.sendBuffer.ChunkSize - this.sendBuffer.FirstIndex;
- if (sendSize > this.sendBuffer.Length)
- {
- sendSize = (int)this.sendBuffer.Length;
- }
- this.outArgs.SetBuffer(this.sendBuffer.First, this.sendBuffer.FirstIndex, sendSize);
- if (this.socket.SendAsync(this.outArgs))
- {
- return;
- }
- HandleSend(this.outArgs);
- }
- catch (Exception e)
- {
- throw new Exception(
- $"socket set buffer error: {this.sendBuffer.First.Length}, {this.sendBuffer.FirstIndex}", e);
- }
- }
- }
- private void OnSendComplete(object o)
- {
- HandleSend(o);
- this.isSending = false;
- this.StartSend();
- }
- private void HandleSend(object o)
- {
- if (this.socket == null)
- {
- return;
- }
- SocketAsyncEventArgs e = (SocketAsyncEventArgs)o;
- if (e.SocketError != SocketError.Success)
- {
- this.OnError((int)e.SocketError);
- return;
- }
- if (e.BytesTransferred == 0)
- {
- this.OnError(ErrorCore.ERR_PeerDisconnect);
- return;
- }
- this.sendBuffer.FirstIndex += e.BytesTransferred;
- if (this.sendBuffer.FirstIndex == this.sendBuffer.ChunkSize)
- {
- this.sendBuffer.FirstIndex = 0;
- this.sendBuffer.RemoveFirst();
- }
- }
- private void OnRead(MemoryStream memoryStream)
- {
- try
- {
- long channelId = this.Id;
- this.Service.ReadCallback(channelId, memoryStream);
- }
- catch (Exception e)
- {
- Log.DetectionError($"{this.RemoteAddress} {memoryStream.Length} {e}");
- // 出现任何消息解析异常都要断开Session,防止客户端伪造消息
- this.OnError(ErrorCore.ERR_PacketParserError);
- }
- }
- private void OnError(int error)
- {
- Log.Info($"TChannel OnError: {error} {this.RemoteAddress}");
- long channelId = this.Id;
- this.Service.Remove(channelId);
- this.Service.ErrorCallback(channelId, error);
- }
- #endregion
- }
- }
|