| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Net;
- using System.Net.Sockets;
- namespace ET
- {
- public sealed class TService : AService
- {
- private readonly Dictionary<long, TChannel> idChannels = new Dictionary<long, TChannel>();
- private readonly SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
- private Socket acceptor;
- public HashSet<long> NeedStartSend = new HashSet<long>();
- // public TService(ThreadSynchronizationContext threadSynchronizationContext, ServiceType serviceType)
- // {
- // this.ServiceType = serviceType;
- // this.ThreadSynchronizationContext = threadSynchronizationContext;
- // }
- public TService(IPEndPoint ipEndPoint, ServiceType serviceType)
- {
- this.ServiceType = serviceType;
- this.acceptor = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- this.acceptor.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
- this.innArgs.Completed += this.OnComplete;
- this.acceptor.Bind(ipEndPoint);
- this.acceptor.Listen(1000);
- ThreadSynchronizationContext.Instance.PostNext(this.AcceptAsync);
- }
- private void OnComplete(object sender, SocketAsyncEventArgs e)
- {
- switch (e.LastOperation)
- {
- case SocketAsyncOperation.Accept:
- SocketError socketError = e.SocketError;
- Socket acceptSocket = e.AcceptSocket;
- ThreadSynchronizationContext.Instance.Post(() =>
- {
- this.OnAcceptComplete(socketError, acceptSocket);
- });
- break;
- default:
- throw new Exception($"socket error: {e.LastOperation}");
- }
- }
- #region 网络线程
- private void OnAcceptComplete(SocketError socketError, Socket acceptSocket)
- {
- if (this.acceptor == null)
- {
- return;
- }
- if (socketError != SocketError.Success)
- {
- Log.Error($"accept error {socketError}");
- return;
- }
- try
- {
- long id = this.CreateAcceptChannelId(0);
- TChannel channel = new TChannel(id, acceptSocket, this);
- this.idChannels.Add(channel.Id, channel);
- long channelId = channel.Id;
- this.OnAccept(channelId, channel.RemoteAddress);
- }
- catch (Exception exception)
- {
- Log.Error(exception);
- }
- // 开始新的accept
- this.AcceptAsync();
- }
- private void AcceptAsync()
- {
- this.innArgs.AcceptSocket = null;
- if (this.acceptor.AcceptAsync(this.innArgs))
- {
- return;
- }
- OnAcceptComplete(this.innArgs.SocketError, this.innArgs.AcceptSocket);
- }
- public override void Create(long id, IPEndPoint ipEndPoint, string address)
- {
- if (this.idChannels.TryGetValue(id, out TChannel _))
- {
- return;
- }
- TChannel channel = new TChannel(id, ipEndPoint, this);
- this.idChannels.Add(channel.Id, channel);
- }
- // protected override void Get(long id, IPEndPoint address)
- // {
- // if (this.idChannels.TryGetValue(id, out TChannel _))
- // {
- // return;
- // }
- // this.Create(address, id);
- // }
- private TChannel Get(long id)
- {
- TChannel channel = null;
- this.idChannels.TryGetValue(id, out channel);
- return channel;
- }
- public override void Dispose()
- {
- this.acceptor?.Close();
- this.acceptor = null;
- this.innArgs.Dispose();
- // ThreadSynchronizationContext = null;
- foreach (long id in this.idChannels.Keys.ToArray())
- {
- TChannel channel = this.idChannels[id];
- channel.Dispose();
- }
- this.idChannels.Clear();
- }
- public override void Remove(long id, int error = 0)
- {
- if (this.idChannels.TryGetValue(id, out TChannel channel))
- {
- channel.Error = error;
- channel.Dispose();
- }
- this.idChannels.Remove(id);
- }
- public override void Send(long channelId, long actorId, MemoryStream stream)
- {
- try
- {
- TChannel aChannel = this.Get(channelId);
- if (aChannel == null)
- {
- this.OnError(channelId, ErrorCore.ERR_SendMessageNotFoundTChannel);
- return;
- }
- aChannel.Send(actorId, stream);
- }
- catch (Exception e)
- {
- Log.Error(e);
- }
- }
- public override void Update()
- {
- foreach (long channelId in this.NeedStartSend)
- {
- TChannel tChannel = this.Get(channelId);
- tChannel?.Update();
- }
- this.NeedStartSend.Clear();
- }
- public override bool IsDisposed()
- {
- return this.acceptor == null;
- }
- #endregion
- }
- }
|