Prechádzať zdrojové kódy

增加分布式锁组件,用来处理对象镜像在不同服务器时修改数据一致性问题

tanghai 9 rokov pred
rodič
commit
9bf812e6e3

+ 16 - 9
Server/Controller/Message/C2G_LoginGateHandler.cs

@@ -9,17 +9,24 @@ namespace Controller
 	{
 		protected override async void Run(Session session, C2G_LoginGate message, Action<G2C_LoginGate> reply)
 		{
-			bool isCheckOK = Game.Scene.GetComponent<GateSessionKeyComponent>().Check(message.Key);
-			G2C_LoginGate g2CLoginGate = new G2C_LoginGate();
-			if (!isCheckOK)
+			G2C_LoginGate response = new G2C_LoginGate();
+			try
 			{
-				g2CLoginGate.Error = ErrorCode.ERR_ConnectGateKeyError;
-				g2CLoginGate.Message = "Gate key验证失败!";
-			}
-			reply(g2CLoginGate);
+				bool isCheckOK = Game.Scene.GetComponent<GateSessionKeyComponent>().Check(message.Key);
+				if (!isCheckOK)
+				{
+					response.Error = ErrorCode.ERR_ConnectGateKeyError;
+					response.Message = "Gate key验证失败!";
+				}
+				reply(response);
 
-			await Game.Scene.GetComponent<TimerComponent>().WaitAsync(5000);
-			session.Dispose();
+				await Game.Scene.GetComponent<TimerComponent>().WaitAsync(5000);
+				session.Dispose();
+			}
+			catch (Exception e)
+			{
+				ReplyError(response, e, reply);
+			}
 		}
 	}
 }

+ 3 - 4
Server/Controller/Message/C2M_ReloadHandler.cs

@@ -9,7 +9,7 @@ namespace Controller
 	{
 		protected override async void Run(Session session, C2M_Reload message, Action<M2C_Reload> reply)
 		{
-			M2C_Reload m2CReload = new M2C_Reload();
+			M2C_Reload response = new M2C_Reload();
 			try
 			{
 				StartConfigComponent startConfigComponent = Game.Scene.GetComponent<StartConfigComponent>();
@@ -24,13 +24,12 @@ namespace Controller
 					Session serverSession = netInnerComponent.Get(innerConfig.Address);
 					await serverSession.Call<M2A_Reload, A2M_Reload>(new M2A_Reload());
 				}
+				reply(response);
 			}
 			catch (Exception e)
 			{
-				m2CReload.Error = ErrorCode.ERR_ReloadFail;
-				m2CReload.Message = e.ToString();
+				ReplyError(response, e, reply);
 			}
-			reply(m2CReload);
 		}
 	}
 }

+ 27 - 18
Server/Controller/Message/C2R_LoginHandler.cs

@@ -9,26 +9,35 @@ namespace Controller
 	{
 		protected override async void Run(Session session, C2R_Login message, Action<R2C_Login> reply)
 		{
-			R2C_Login r2CLogin;
-			if (message.Account != "abcdef" || message.Password != "111111")
+			R2C_Login response = new R2C_Login();
+			try
 			{
-				r2CLogin = new R2C_Login {Error = ErrorCode.ERR_AccountOrPasswordError, Message = "账号名或者密码错误!"};
-				reply(r2CLogin);
-				return;
-			}
+				if (message.Account != "abcdef" || message.Password != "111111")
+				{
+					response.Error = ErrorCode.ERR_AccountOrPasswordError;
+					reply(response);
+					return;
+				}
+
+				// 随机分配一个Gate
+				StartConfig config = Game.Scene.GetComponent<RealmGateAddressComponent>().GetAddress();
+				//Log.Debug($"gate address: {MongoHelper.ToJson(config)}");
+				string innerAddress = $"{config.GetComponent<InnerConfig>().Host}:{config.GetComponent<InnerConfig>().Port}";
+				Session gateSession = Game.Scene.GetComponent<NetInnerComponent>().Get(innerAddress);
+				
+				// 向gate请求一个key,客户端可以拿着这个key连接gate
+				G2R_GetLoginKey g2RGetLoginKey = await gateSession.Call<R2G_GetLoginKey, G2R_GetLoginKey>(new R2G_GetLoginKey());
+				
+				string outerAddress = $"{config.GetComponent<OuterConfig>().Host}:{config.GetComponent<OuterConfig>().Port}";
 
-			// 随机分配一个Gate
-			StartConfig config = Game.Scene.GetComponent<RealmGateAddressComponent>().GetAddress();
-			//Log.Debug($"gate address: {MongoHelper.ToJson(config)}");
-			string innerAddress = $"{config.GetComponent<InnerConfig>().Host}:{config.GetComponent<InnerConfig>().Port}";
-			Session gateSession = Game.Scene.GetComponent<NetInnerComponent>().Get(innerAddress);
-			
-			// 向gate请求一个key,客户端可以拿着这个key连接gate
-			G2R_GetLoginKey g2RGetLoginKey = await gateSession.Call<R2G_GetLoginKey, G2R_GetLoginKey>(new R2G_GetLoginKey());
-			
-			string outerAddress = $"{config.GetComponent<OuterConfig>().Host}:{config.GetComponent<OuterConfig>().Port}";
-			r2CLogin = new R2C_Login {Address = outerAddress, Key = g2RGetLoginKey.Key};
-			reply(r2CLogin);
+				response.Address = outerAddress;
+				response.Key = g2RGetLoginKey.Key;
+				reply(response);
+			}
+			catch (Exception e)
+			{
+				ReplyError(response, e, reply);
+			}
 		}
 	}
 }

+ 25 - 0
Server/Controller/Message/G2G_LockReleaseRequestHandler.cs

@@ -0,0 +1,25 @@
+using System;
+using Base;
+using Model;
+
+namespace Controller
+{
+	[MessageHandler(AppType.Gate)]
+	public class G2G_LockReleaseRequestHandler : AMRpcHandler<G2G_LockReleaseRequest, G2G_LockReleaseResponse>
+	{
+		protected override void Run(Session session, G2G_LockReleaseRequest message, Action<G2G_LockReleaseResponse> reply)
+		{
+			G2G_LockReleaseResponse g2GLockReleaseResponse = new G2G_LockReleaseResponse();
+			
+			Unit unit = Game.Scene.GetComponent<UnitComponent>().Get(message.Id);
+			if (unit == null)
+			{
+				g2GLockReleaseResponse.Error = ErrorCode.ERR_NotFoundUnit;
+				reply(g2GLockReleaseResponse);
+			}
+
+			unit.GetComponent<MasterComponent>().Release(message.Address);
+			reply(g2GLockReleaseResponse);
+		}
+	}
+}

+ 32 - 0
Server/Controller/Message/G2G_LockRequestHandler.cs

@@ -0,0 +1,32 @@
+using System;
+using Base;
+using Model;
+
+namespace Controller
+{
+	[MessageHandler(AppType.Gate)]
+	public class G2G_LockRequestHandler : AMRpcHandler<G2G_LockRequest, G2G_LockResponse>
+	{
+		protected override async void Run(Session session, G2G_LockRequest message, Action<G2G_LockResponse> reply)
+		{
+			G2G_LockResponse response = new G2G_LockResponse();
+			try
+			{
+				Unit unit = Game.Scene.GetComponent<UnitComponent>().Get(message.Id);
+				if (unit == null)
+				{
+					response.Error = ErrorCode.ERR_NotFoundUnit;
+					reply(response);
+				}
+
+				await unit.GetComponent<MasterComponent>().Lock(message.Address);
+
+				reply(response);
+			}
+			catch (Exception e)
+			{
+				ReplyError(response, e, reply);
+			}
+		}
+	}
+}

+ 5 - 4
Server/Controller/Message/M2A_ReloadHandler.cs

@@ -9,19 +9,20 @@ namespace Controller
 	{
 		protected override void Run(Session session, M2A_Reload message, Action<A2M_Reload> reply)
 		{
-			A2M_Reload a2MReload = new A2M_Reload();
+			A2M_Reload response = new A2M_Reload();
 			try
 			{
 				Game.DisposerEventManager.Register("Controller", DllHelper.GetController());
+				reply(response);
 			}
 			catch (Exception e)
 			{
-				a2MReload.Error = ErrorCode.ERR_ReloadFail;
+				response.Error = ErrorCode.ERR_ReloadFail;
 				StartConfig myStartConfig = Game.Scene.GetComponent<StartConfigComponent>().StartConfig;
 				InnerConfig innerConfig = myStartConfig.GetComponent<InnerConfig>();
-				a2MReload.Message = $"{innerConfig.Address} reload fail, {e}";
+				response.Message = $"{innerConfig.Address} reload fail, {e}";
+				reply(response);
 			}
-			reply(a2MReload);
 		}
 	}
 }

+ 12 - 3
Server/Controller/Message/R2G_GetLoginKeyHandler.cs

@@ -1,4 +1,5 @@
 using System;
+using System.Threading.Tasks;
 using Base;
 using Model;
 
@@ -9,9 +10,17 @@ namespace Controller
 	{
 		protected override void Run(Session session, R2G_GetLoginKey message, Action<G2R_GetLoginKey> reply)
 		{
-			long key = Game.Scene.GetComponent<GateSessionKeyComponent>().Get();
-			G2R_GetLoginKey g2RGetLoginKey = new G2R_GetLoginKey(key);
-			reply(g2RGetLoginKey);
+			G2R_GetLoginKey response = new G2R_GetLoginKey();
+			try
+			{
+				long key = Game.Scene.GetComponent<GateSessionKeyComponent>().Get();
+				response.Key = key;
+				reply(response);
+			}
+			catch (Exception e)
+			{
+				ReplyError(response, e, reply);
+			}
 		}
 	}
 }

+ 2 - 0
Server/Controller/Server.Controller.csproj

@@ -40,8 +40,10 @@
     <Compile Include="Component\RealmGateAddressComponent.cs" />
     <Compile Include="Message\C2G_LoginGateHandler.cs" />
     <Compile Include="Message\C2R_PingHandler.cs" />
+    <Compile Include="Message\G2G_LockReleaseRequestHandler.cs" />
     <Compile Include="Message\M2A_ReloadHandler.cs" />
     <Compile Include="Message\C2M_ReloadHandler.cs" />
+    <Compile Include="Message\G2G_LockRequestHandler.cs" />
     <Compile Include="Message\R2G_GetLoginKeyHandler.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Message\C2R_LoginHandler.cs" />

+ 110 - 0
Server/Model/Component/Unit/LockComponent.cs

@@ -0,0 +1,110 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Base;
+
+namespace Model
+{
+	public enum LockStatus
+	{
+		LockedNot,
+		LockRequesting,
+		Locked,
+	}
+
+	/// <summary>
+	/// 分布式锁组件,Unit对象可能在不同进程上有镜像,访问该对象的时候需要对他加锁
+	/// </summary>
+	[DisposerEvent(typeof(LockComponent))]
+	public class LockComponent: Component
+	{
+		private LockStatus status = LockStatus.LockedNot;
+		private string address;
+		private int lockCount;
+		private readonly Queue<TaskCompletionSource<bool>> queue = new Queue<TaskCompletionSource<bool>>();
+
+		public void Awake(string addr)
+		{
+			this.address = addr;
+		}
+		
+		public async Task Lock()
+		{
+			++this.lockCount;
+
+			if (this.status == LockStatus.Locked)
+			{
+				return;
+			}
+			if (this.status == LockStatus.LockRequesting)
+			{
+				await WaitLock();
+				return;
+			}
+			
+			this.status = LockStatus.LockRequesting;
+
+			// 真身直接本地请求锁,镜像需要调用Rpc获取锁
+			MasterComponent masterComponent = this.GetComponent<MasterComponent>();
+			if (masterComponent != null)
+			{
+				await masterComponent.Lock(this.address);
+			}
+			else
+			{
+				RequestLock();
+				await WaitLock();
+			}
+		}
+
+		private Task<bool> WaitLock()
+		{
+			TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
+			if (this.status == LockStatus.Locked)
+			{
+				tcs.SetResult(true);
+				return tcs.Task;
+			}
+
+			this.queue.Enqueue(tcs);
+			return tcs.Task;
+		}
+
+		private async void RequestLock()
+		{
+			try
+			{
+				Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.address);
+				string serverAddress = Game.Scene.GetComponent<StartConfigComponent>().StartConfig.ServerIP;
+				G2G_LockRequest request = new G2G_LockRequest { Id = this.Owner.Id, Address = serverAddress };
+				await session.Call<G2G_LockRequest, G2G_LockResponse>(request);
+
+				this.status = LockStatus.Locked;
+
+				foreach (TaskCompletionSource<bool> taskCompletionSource in this.queue)
+				{
+					taskCompletionSource.SetResult(true);
+				}
+				this.queue.Clear();
+			}
+			catch (Exception e)
+			{
+				Log.Error($"获取锁失败: {this.address} {this.Owner.Id} {e}");
+			}
+		}
+
+		public async Task Release()
+		{
+			--this.lockCount;
+			if (this.lockCount != 0)
+			{
+				return;
+			}
+
+			this.status = LockStatus.LockedNot;
+			Session session = Game.Scene.GetComponent<NetInnerComponent>().Get(this.address);
+			G2G_LockReleaseRequest request = new G2G_LockReleaseRequest();
+			await session.Call<G2G_LockReleaseRequest, G2G_LockReleaseResponse>(request);
+		}
+	}
+}

+ 72 - 0
Server/Model/Component/Unit/MasterComponent.cs

@@ -0,0 +1,72 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Base;
+
+namespace Model
+{
+	public class LockInfo
+	{
+		public string Address;
+		public TaskCompletionSource<bool> Tcs;
+
+		public LockInfo(string address, TaskCompletionSource<bool> tcs)
+		{
+			this.Address = address;
+			this.Tcs = tcs;
+		}
+	}
+
+	[DisposerEvent(typeof(MasterComponent))]
+	public class MasterComponent : Component
+	{
+		private readonly List<string> slavesAddress = new List<string>();
+
+		private string lockedAddress = "";
+
+		private readonly Queue<LockInfo> queue = new Queue<LockInfo>();
+
+		public void AddSlave(string address)
+		{
+			this.slavesAddress.Add(address);
+		}
+
+		public void RemoveSlave(string address)
+		{
+			this.slavesAddress.Remove(address);
+		}
+
+		public Task<bool> Lock(string address)
+		{
+			TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
+			
+			if (this.lockedAddress == "")
+			{
+				this.lockedAddress = address;
+				tcs.SetResult(true);
+			}
+			else
+			{
+				LockInfo lockInfo = new LockInfo(address, tcs);
+				this.queue.Enqueue(lockInfo);
+			}
+			return tcs.Task;
+		}
+
+		public void Release(string address)
+		{
+			if (this.lockedAddress != address)
+			{
+				Log.Error($"解锁地址与锁地址不匹配! {this.lockedAddress} {address}");
+				return;
+			}
+			if (this.queue.Count == 0)
+			{
+				this.lockedAddress = "";
+				return;
+			}
+			LockInfo lockInfo = this.queue.Dequeue();
+			this.lockedAddress = lockInfo.Address;
+			lockInfo.Tcs.SetResult(true);
+		}
+	}
+}

+ 5 - 0
Server/Model/Server.Model.csproj

@@ -80,6 +80,9 @@
     <Compile Include="..\..\Unity\Assets\Scripts\Component\TimerComponent.cs">
       <Link>Component\TimerComponent.cs</Link>
     </Compile>
+    <Compile Include="..\..\Unity\Assets\Scripts\Component\UnitComponent.cs">
+      <Link>Component\UnitComponent.cs</Link>
+    </Compile>
     <Compile Include="..\..\Unity\Assets\Scripts\Config\ACategory.cs">
       <Link>Config\ACategory.cs</Link>
     </Compile>
@@ -197,6 +200,8 @@
     <Compile Include="..\..\Unity\Assets\Scripts\Other\Options.cs">
       <Link>Other\Options.cs</Link>
     </Compile>
+    <Compile Include="Component\Unit\MasterComponent.cs" />
+    <Compile Include="Component\Unit\LockComponent.cs" />
     <Compile Include="Component\AppManagerComponent.cs" />
     <Compile Include="Component\GateSessionKeyComponent.cs" />
     <Compile Include="Component\RealmGateAddressComponent.cs" />

+ 40 - 0
Unity/Assets/Scripts/Component/UnitComponent.cs

@@ -0,0 +1,40 @@
+using System.Collections.Generic;
+
+namespace Model
+{
+	public class UnitComponent: Component
+	{
+		private readonly Dictionary<long, Unit> idUnits = new Dictionary<long, Unit>();
+
+		public void Add(Unit unit)
+		{
+			this.idUnits.Add(unit.Id, unit);
+		}
+
+		public Unit Get(long id)
+		{
+			Unit unit;
+			this.idUnits.TryGetValue(id, out unit);
+			return unit;
+		}
+
+		public void Remove(long id)
+		{
+			this.idUnits.Remove(id);
+		}
+
+		public override void Dispose()
+		{
+			if (this.Id == 0)
+			{
+				return;
+			}
+			base.Dispose();
+
+			foreach (Unit unit in this.idUnits.Values)
+			{
+				unit.Dispose();
+			}
+		}
+	}
+}

+ 12 - 0
Unity/Assets/Scripts/Component/UnitComponent.cs.meta

@@ -0,0 +1,12 @@
+fileFormatVersion: 2
+guid: d0edacd02e5460b4fa72b01dba5e4083
+timeCreated: 1480324632
+licenseType: Pro
+MonoImporter:
+  serializedVersion: 2
+  defaultReferences: []
+  executionOrder: 0
+  icon: {instanceID: 0}
+  userData: 
+  assetBundleName: 
+  assetBundleVariant: 

+ 23 - 2
Unity/Assets/Scripts/Entity/Message/InnerMessage.cs

@@ -1,6 +1,6 @@
 using MongoDB.Bson.Serialization.Attributes;
-
-// 服务器内部消息 Opcode从10000开始
+
+// 服务器内部消息 Opcode从10000开始
 
 namespace Model
 {
@@ -15,6 +15,7 @@ namespace Model
 	public class G2R_GetLoginKey : AResponse
 	{
 		public long Key;
+
		public G2R_GetLoginKey()
		{
		}
 
 		public G2R_GetLoginKey(long key)
 		{
@@ -32,5 +33,25 @@ namespace Model
 	[BsonIgnoreExtraElements]
 	public class A2M_Reload : AResponse
 	{
+	}
+
+	[Message(10005)]
+	[BsonIgnoreExtraElements]
+	public class G2G_LockRequest : ARequest
	{
		public long Id;
		public string Address;
	}
+
+	[Message(10006)]
+	[BsonIgnoreExtraElements]
+	public class G2G_LockResponse : AResponse
+	{
+	}
+
+	[Message(10007)]
+	[BsonIgnoreExtraElements]
+	public class G2G_LockReleaseRequest : ARequest
	{
		public long Id;
		public string Address;
	}
+
+	[Message(10008)]
+	[BsonIgnoreExtraElements]
+	public class G2G_LockReleaseResponse : AResponse
+	{
 	}
 }

+ 8 - 0
Unity/Assets/Scripts/Message/AMHandler.cs

@@ -32,6 +32,14 @@ namespace Model
 			where Request : ARequest
 			where Response: AResponse
 	{
+		protected static void ReplyError(Response response, Exception e, Action<Response> reply)
+		{
+			Log.Error(e.ToString());
+			response.Error = ErrorCode.ERR_RpcFail;
+			response.Message = e.ToString();
+			reply(response);
+		}
+
 		protected abstract void Run(Session session, Request message, Action<Response> reply);
 
 		public void Handle(Session session, MessageInfo messageInfo)

+ 1 - 4
Unity/Assets/Scripts/Message/AMessage.cs

@@ -1,10 +1,7 @@
 namespace Model
 {
-	public abstract class AMessage: Object
+	public abstract class AMessage
 	{
-		protected AMessage(): base(0)
-		{
-		}
 	}
 
 	public abstract class ARequest : AMessage

+ 5 - 3
Unity/Assets/Scripts/Message/ErrorCode.cs

@@ -3,8 +3,10 @@ namespace Model
 	public static class ErrorCode
 	{
 		public const int ERR_Success = 0;
-		public const int ERR_AccountOrPasswordError = 1;
-		public const int ERR_ConnectGateKeyError = 2;
-		public const int ERR_ReloadFail = 3;
+		public const int ERR_RpcFail = 1;
+		public const int ERR_AccountOrPasswordError = 2;
+		public const int ERR_ConnectGateKeyError = 3;
+		public const int ERR_ReloadFail = 4;
+		public const int ERR_NotFoundUnit = 5;
 	}
 }

+ 1 - 0
Unity/Unity.csproj

@@ -99,6 +99,7 @@
     <Compile Include="Assets\Scripts\Component\TimeComponent.cs" />
     <Compile Include="Assets\Scripts\Component\TimerComponent.cs" />
     <Compile Include="Assets\Scripts\Component\UIComponent.cs" />
+    <Compile Include="Assets\Scripts\Component\UnitComponent.cs" />
     <Compile Include="Assets\Scripts\Config\ACategory.cs" />
     <Compile Include="Assets\Scripts\Config\AConfig.cs" />
     <Compile Include="Assets\Scripts\Config\AConfigComponent.cs" />