Explorar o código

实现actorProxy,向一个actor发送消息,actorproxy会排队发送,如果发现actor不存在,则重新向location请求actor地址,重新发送,重试3次

tanghai %!s(int64=8) %!d(string=hai) anos
pai
achega
331dc20242

+ 1 - 1
Server/Hotfix/Message/C2M_ReloadHandler.cs

@@ -21,7 +21,7 @@ namespace Hotfix
 					}
 					InnerConfig innerConfig = startConfig.GetComponent<InnerConfig>();
 					Session serverSession = netInnerComponent.Get(innerConfig.Address);
-					await serverSession.Call<M2A_Reload, A2M_Reload>(new M2A_Reload());
+					await serverSession.Call<A2M_Reload>(new M2A_Reload());
 				}
 				reply(response);
 			}

+ 1 - 1
Server/Hotfix/Message/C2R_LoginHandler.cs

@@ -25,7 +25,7 @@ namespace Hotfix
 				Session gateSession = Game.Scene.GetComponent<NetInnerComponent>().Get(innerAddress);
 
 				// 向gate请求一个key,客户端可以拿着这个key连接gate
-				G2R_GetLoginKey g2RGetLoginKey = await gateSession.Call<R2G_GetLoginKey, G2R_GetLoginKey>(new R2G_GetLoginKey());
+				G2R_GetLoginKey g2RGetLoginKey = await gateSession.Call<G2R_GetLoginKey>(new R2G_GetLoginKey());
 
 				string outerAddress = $"{config.GetComponent<OuterConfig>().Host}:{config.GetComponent<OuterConfig>().Port}";
 

+ 1 - 1
Server/Hotfix/Message/ObjectUnLockRequestHandler.cs

@@ -11,7 +11,7 @@ namespace Hotfix
 			ObjectUnLockResponse response = new ObjectUnLockResponse();
 			try
 			{
-				Game.Scene.GetComponent<LocationComponent>().UnLock(message.Key);
+				Game.Scene.GetComponent<LocationComponent>().UnLock(message.Key, message.Value);
 				reply(response);
 			}
 			catch (Exception e)

+ 1 - 1
Server/Model/Component/BenchmarkComponent.cs

@@ -36,7 +36,7 @@ namespace Model
 					while (i < 10000000)
 					{
 						++i;
-						await session.Call<C2R_Ping, R2C_Ping>(new C2R_Ping());
+						await session.Call<R2C_Ping>(new C2R_Ping());
 
 						++this.k;
 

+ 7 - 7
Server/Model/Component/DBProxyComponent.cs

@@ -20,31 +20,31 @@ namespace Model
 		public async Task Save(Entity entity, bool needCache = true)
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(dbAddress);
-			await session.Call<DBSaveRequest, DBSaveResponse>(new DBSaveRequest { Entity = entity, NeedCache = needCache});
+			await session.Call<DBSaveResponse>(new DBSaveRequest { Entity = entity, NeedCache = needCache});
 		}
 
 		public async Task SaveBatch(List<Entity> entitys, bool needCache = true)
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(dbAddress);
-			await session.Call<DBSaveBatchRequest, DBSaveBatchResponse>(new DBSaveBatchRequest { Entitys = entitys, NeedCache = needCache});
+			await session.Call<DBSaveBatchResponse>(new DBSaveBatchRequest { Entitys = entitys, NeedCache = needCache});
 		}
 
 		public async Task Save(Entity entity, bool needCache, CancellationToken cancellationToken)
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(dbAddress);
-			await session.Call<DBSaveRequest, DBSaveResponse>(new DBSaveRequest { Entity = entity, NeedCache = needCache}, cancellationToken);
+			await session.Call<DBSaveResponse>(new DBSaveRequest { Entity = entity, NeedCache = needCache}, cancellationToken);
 		}
 
 		public async void SaveLog(Entity entity)
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(dbAddress);
-			await session.Call<DBSaveRequest, DBSaveResponse>(new DBSaveRequest { Entity = entity,  NeedCache = false, CollectionName = "Log" });
+			await session.Call<DBSaveResponse>(new DBSaveRequest { Entity = entity,  NeedCache = false, CollectionName = "Log" });
 		}
 
 		public async Task<T> Query<T>(long id, bool needCache = true) where T: Entity
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(dbAddress);
-			DBQueryResponse dbQueryResponse = await session.Call<DBQueryRequest, DBQueryResponse>(new DBQueryRequest { CollectionName = typeof(T).Name, Id = id, NeedCache = needCache });
+			DBQueryResponse dbQueryResponse = await session.Call<DBQueryResponse>(new DBQueryRequest { CollectionName = typeof(T).Name, Id = id, NeedCache = needCache });
 			return (T)dbQueryResponse.Entity;
 		}
 
@@ -52,7 +52,7 @@ namespace Model
 		{
 			List<T> list = new List<T>();
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(dbAddress);
-			DBQueryBatchResponse dbQueryBatchResponse = await session.Call<DBQueryBatchRequest, DBQueryBatchResponse>(new DBQueryBatchRequest { CollectionName = typeof(T).Name, IdList = ids, NeedCache = needCache});
+			DBQueryBatchResponse dbQueryBatchResponse = await session.Call<DBQueryBatchResponse>(new DBQueryBatchRequest { CollectionName = typeof(T).Name, IdList = ids, NeedCache = needCache});
 			foreach (Entity entity in dbQueryBatchResponse.Entitys)
 			{
 				list.Add((T)entity);
@@ -64,7 +64,7 @@ namespace Model
 		{
 			List<T> list = new List<T>();
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(dbAddress);
-			DBQueryJsonResponse dbQueryJsonResponse = await session.Call<DBQueryJsonRequest, DBQueryJsonResponse>(new DBQueryJsonRequest { CollectionName = typeof(T).Name, Json = json, NeedCache = needCache});
+			DBQueryJsonResponse dbQueryJsonResponse = await session.Call<DBQueryJsonResponse>(new DBQueryJsonRequest { CollectionName = typeof(T).Name, Json = json, NeedCache = needCache});
 			foreach (Entity entity in dbQueryJsonResponse.Entitys)
 			{
 				list.Add((T)entity);

+ 3 - 1
Server/Model/Component/LocationComponent.cs

@@ -118,10 +118,12 @@ namespace Model
 			this.lockSet.Add(key);
 		}
 
-		public void UnLock(long key)
+		public void UnLock(long key, string value)
 		{
 			this.lockSet.Remove(key);
 
+			this.locations[key] = value;
+
 			if (!this.taskQueues.TryGetValue(key, out Queue<LocationTask> tasks))
 			{
 				return;

+ 3 - 3
Server/Model/Component/LocationProxyComponent.cs

@@ -23,19 +23,19 @@ namespace Model
 		public async Task Add(long key)
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
-			await session.Call<ObjectAddRequest, ObjectAddResponse>(new ObjectAddRequest() { Key = key });
+			await session.Call<ObjectAddResponse>(new ObjectAddRequest() { Key = key });
 		}
 
 		public async Task Remove(long key)
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
-			await session.Call<ObjectRemoveRequest, ObjectRemoveResponse>(new ObjectRemoveRequest() { Key = key });
+			await session.Call<ObjectRemoveResponse>(new ObjectRemoveRequest() { Key = key });
 		}
 
 		public async Task<string> Get(long key)
 		{
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.LocationAddress);
-			ObjectGetResponse response = await session.Call<ObjectGetRequest, ObjectGetResponse>(new ObjectGetRequest() { Key = key });
+			ObjectGetResponse response = await session.Call<ObjectGetResponse>(new ObjectGetRequest() { Key = key });
 			return response.Location;
 		}
 

+ 2 - 2
Server/Model/Component/Unit/LockComponent.cs

@@ -83,7 +83,7 @@ namespace Model
 				Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.address);
 				string serverAddress = Game.Scene.GetComponent<StartConfigComponent>().StartConfig.ServerIP;
 				G2G_LockRequest request = new G2G_LockRequest { Id = this.Owner.Id, Address = serverAddress };
-				await session.Call<G2G_LockRequest, G2G_LockResponse>(request);
+				await session.Call<G2G_LockResponse>(request);
 
 				this.status = LockStatus.Locked;
 
@@ -110,7 +110,7 @@ namespace Model
 			this.status = LockStatus.LockedNot;
 			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.address);
 			G2G_LockReleaseRequest request = new G2G_LockReleaseRequest();
-			await session.Call<G2G_LockReleaseRequest, G2G_LockReleaseResponse>(request);
+			await session.Call<G2G_LockReleaseResponse>(request);
 		}
 	}
 }

+ 185 - 31
Server/Model/Entity/ActorProxy.cs

@@ -1,68 +1,222 @@
-using System;
+using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace Model
 {
+	public abstract class ActorTask
+	{
+		public abstract Task<AResponse> Run();
+
+		public abstract void RunFail(int error);
+	}
+
+	/// <summary>
+	/// 普通消息,不需要response
+	/// </summary>
+	public class ActorMessageTask: ActorTask
+	{
+		private readonly ActorProxy proxy;
+		private readonly ARequest message;
+
+		public ActorMessageTask(ActorProxy proxy, ARequest message)
+		{
+			this.proxy = proxy;
+			this.message = message;
+		}
+
+		public override async Task<AResponse> Run()
+		{
+			AResponse response = await this.proxy.RealCall<ActorMessageResponse>(this.message, this.proxy.CancellationTokenSource.Token);
+			return response;
+		}
+
+		public override void RunFail(int error)
+		{
+		}
+	}
+
+	/// <summary>
+	/// Rpc消息,需要等待返回
+	/// </summary>
+	/// <typeparam name="Response"></typeparam>
+	public class ActorRpcTask<Response> : ActorTask where Response: AActorResponse
+	{
+		private readonly ActorProxy proxy;
+		private readonly AActorRequest message;
+
+		public readonly TaskCompletionSource<Response> Tcs = new TaskCompletionSource<Response>();
+
+		public ActorRpcTask(ActorProxy proxy, AActorRequest message)
+		{
+			this.proxy = proxy;
+			this.message = message;
+		}
+
+		public override async Task<AResponse> Run()
+		{
+			Response response = await this.proxy.RealCall<Response>(this.message, this.proxy.CancellationTokenSource.Token);
+			if (response.Error != ErrorCode.ERR_NotFoundActor)
+			{
+				this.Tcs.SetResult(response);
+			}
+			return response;
+		}
+
+		public override void RunFail(int error)
+		{
+			this.Tcs.SetException(new RpcException(error, ""));
+		}
+	}
+
 	public sealed class ActorProxy : Entity
 	{
+		// actor的地址
 		public string Address;
+
+		// 已发送等待回应的消息
+		public Queue<ActorTask> RunningTasks;
+
+		// 还没发送的消息
+		public Queue<ActorTask> WaitingTasks;
+
+		// 发送窗口大小
+		public int WindowSize = 1;
+
+		// 最大窗口
+		public const int MaxWindowSize = 100;
+
+		private TaskCompletionSource<ActorTask> tcs;
+
+		public CancellationTokenSource CancellationTokenSource;
+
+		private int failTimes;
 		
 		public ActorProxy(long id): base(id)
 		{
+			this.UpdateAsync();
 		}
 
-		public void Send<Message>(Message message) where Message : AActorMessage
+		private void Add(ActorTask task)
 		{
-			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
-			session.Send(message);
+			this.WaitingTasks.Enqueue(task);
+			this.AllowGet();
 		}
 
-		public async Task<Response> Call<Request, Response>(Request request) where Request : AActorRequest where Response: AActorResponse
+		private void Remove()
 		{
-			try
+			this.RunningTasks.Dequeue();
+			this.AllowGet();
+		}
+
+		private void AllowGet()
+		{
+			if (this.tcs == null || this.WaitingTasks.Count <= 0 || this.RunningTasks.Count >= this.WindowSize)
 			{
-				Response response = null;
-				if (this.Address == "")
-				{
-					this.Address = await this.Parent.GetComponent<LocationProxyComponent>().Get(this.Id);
-				}
-				response = await OnceCall<Request, Response>(0, request);
-				return response;
+				return;
 			}
-			catch (RpcException e)
+
+			var t = this.tcs;
+			this.tcs = null;
+			ActorTask task = this.WaitingTasks.Dequeue();
+			this.RunningTasks.Enqueue(task);
+			t.SetResult(task);
+		}
+
+		private Task<ActorTask> GetAsync()
+		{
+			if (this.WaitingTasks.Count > 0)
 			{
-				Console.WriteLine(e);
-				throw;
+				ActorTask task = this.WaitingTasks.Dequeue();
+				this.RunningTasks.Enqueue(task);
+				return Task.FromResult(task);
 			}
+
+			this.tcs = new TaskCompletionSource<ActorTask>();
+			return this.tcs.Task;
 		}
 
-		public async Task<Response> OnceCall<Request, Response>(int retryTime, Request request) where Request : AActorRequest where Response : AActorResponse
+		private async void UpdateAsync()
 		{
-			Response response = null;
-			if (retryTime > 0)
+			while (true)
 			{
-				await this.Parent.GetComponent<TimerComponent>().WaitAsync(retryTime * 500);
-				this.Address = await this.Parent.GetComponent<LocationProxyComponent>().Get(this.Id);
+				ActorTask actorTask = await this.GetAsync();
+				this.RunTask(actorTask);
 			}
-			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
-			response = await session.Call<Request, Response>(request);
+		}
 
-			if (response.Error == ErrorCode.ERR_Success)
+		private async void RunTask(ActorTask task)
+		{
+			AResponse response = await task.Run();
+
+			// 如果没找到Actor,发送窗口减少为1,重试
+			if (response.Error == ErrorCode.ERR_NotFoundActor)
 			{
-				return response;
+				this.CancellationTokenSource.Cancel();
+				this.WindowSize = 1;
+				++this.failTimes;
+
+				while (this.WaitingTasks.Count > 0)
+				{
+					ActorTask actorTask = this.WaitingTasks.Dequeue();
+					this.RunningTasks.Enqueue(actorTask);
+				}
+				ObjectHelper.Swap(ref this.RunningTasks, ref this.WaitingTasks);
+
+				// 失败3次则清空actor发送队列,返回失败
+				if (this.failTimes > 3)
+				{
+					while (this.WaitingTasks.Count > 0)
+					{
+						ActorTask actorTask = this.WaitingTasks.Dequeue();
+						actorTask.RunFail(response.Error);
+					}
+					return;
+				}
+
+				// 等待一会再发送
+				await this.Parent.GetComponent<TimerComponent>().WaitAsync(this.failTimes * 500);
+				this.Address = await this.Parent.GetComponent<LocationProxyComponent>().Get(this.Id);
+				this.CancellationTokenSource = new CancellationTokenSource();
+				this.AllowGet();
+				return;
 			}
 
-			if (retryTime >= 3)
+			// 发送成功
+			this.failTimes = 0;
+			if (this.WindowSize < MaxWindowSize)
 			{
-				throw new RpcException(response.Error, response.Message);
+				++this.WindowSize;
 			}
+			this.Remove();
+		}
 
-			if (response.Error == ErrorCode.ERR_NotFoundActor)
+		public void Send(AActorMessage message)
+		{
+			ActorMessageTask task = new ActorMessageTask(this, message);
+			this.Add(task);
+		}
+
+		public Task<Response> Call<Response>(AActorRequest request)where Response : AActorResponse
+		{
+			ActorRpcTask<Response> task = new ActorRpcTask<Response>(this, request);
+			this.Add(task);
+			return task.Tcs.Task;
+		}
+
+		public async Task<Response> RealCall<Response>(ARequest request, CancellationToken cancellationToken) where Response: AResponse
+		{
+			try
+			{
+				Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.Address);
+				Response response = await session.Call<Response>(request, cancellationToken);
+				return response;
+			}
+			catch (RpcException e)
 			{
-				response = await OnceCall<Request, Response>(++retryTime, request);
+				Log.Error(e.ToString());
+				throw;
 			}
-
-			throw new RpcException(response.Error, response.Message);
 		}
 
 		public override void Dispose()

+ 1 - 1
Server/Model/Entity/Message/InnerMessage.cs

@@ -60,7 +60,7 @@ namespace Model
{
	[Message(Opcode.R2G_GetLoginKey)]
	[BsonIgnoreExtraElements]
 
 	[Message(Opcode.ObjectLockResponse)]
	[BsonIgnoreExtraElements]
	public class ObjectLockResponse : AResponse
	{
	}
 
-	[Message(Opcode.ObjectUnLockRequest)]
	[BsonIgnoreExtraElements]
	public class ObjectUnLockRequest : ARequest
	{
		public long Key { get; set; }
	}
+	[Message(Opcode.ObjectUnLockRequest)]
	[BsonIgnoreExtraElements]
	public class ObjectUnLockRequest : ARequest
	{
		public long Key { get; set; }
		public string Value { get; set; }
	}
 
 	[Message(Opcode.ObjectUnLockResponse)]
	[BsonIgnoreExtraElements]
	public class ObjectUnLockResponse : AResponse
	{
	}
 

+ 2 - 2
Server/Model/Entity/Session.cs

@@ -123,7 +123,7 @@ namespace Model
 		/// <summary>
 		/// Rpc调用
 		/// </summary>
-		public Task<Response> Call<Request, Response>(Request request, CancellationToken cancellationToken) where Request : ARequest
+		public Task<Response> Call<Response>(ARequest request, CancellationToken cancellationToken)
 			where Response : AResponse
 		{
 			request.RpcId = ++RpcId;
@@ -158,7 +158,7 @@ namespace Model
 		/// <summary>
 		/// Rpc调用,发送一个消息,等待返回一个消息
 		/// </summary>
-		public Task<Response> Call<Request, Response>(Request request) where Request : ARequest where Response : AResponse
+		public Task<Response> Call<Response>(ARequest request) where Response : AResponse
 		{
 			request.RpcId = ++RpcId;
 			this.SendMessage(request);

+ 5 - 1
Server/Model/Message/AActorMessage.cs

@@ -7,11 +7,15 @@ namespace Model
 		long Id { get; set; }
 	}
 
-	public abstract class AActorMessage: AMessage, IActorMessage
+	public abstract class AActorMessage: ARequest, IActorMessage
 	{
 		public long Id { get; set; }
 	}
 
+	public abstract class ActorMessageResponse : AResponse
+	{
+	}
+
 	public abstract class AActorRequest : ARequest, IActorMessage
 	{
 		[BsonIgnoreIfDefault]

+ 1 - 1
Server/Model/Message/AMessage.cs

@@ -4,7 +4,7 @@
 	{
 	}
 
-	public abstract class ARequest
+	public abstract class ARequest: AMessage
 	{
 		public uint RpcId;
 	}

+ 1 - 1
Server/Model/Message/OuterMessageDispatcher.cs

@@ -21,7 +21,7 @@ namespace Model
 				ActorProxy actorProxy = Game.Scene.GetComponent<ActorProxyComponent>().Get(aActorRequest.Id);
 				aActorRequest.Id = session.GetComponent<SessionGamerComponent>().Gamer.Id;
 				uint rpcId = aActorRequest.RpcId;
-				AActorResponse aActorResponse = await actorProxy.Call<AActorRequest, AActorResponse>(aActorRequest);
+				AActorResponse aActorResponse = await actorProxy.Call<AActorResponse>(aActorRequest);
 				aActorResponse.RpcId = rpcId;
 				session.Reply(aActorResponse);
 				return;