Просмотр исходного кода

1. 增加Process调度器的机制,可以自己实现Process的调度
2. 可以停止调度器,比如重新加载配置,重新加载代码等等需要停止调度器,放置线程竞争

tanghai 2 лет назад
Родитель
Сommit
2148925f8b

+ 1 - 1
Share/Tool/Init.cs

@@ -22,7 +22,7 @@ namespace ET.Server
                     .WithParsed(Game.Instance.AddSingleton);
                 Game.Instance.AddSingleton<Logger>().ILog = new NLogger(Options.Instance.AppType.ToString(), Options.Instance.Process, "../Config/NLog/NLog.config");
                 
-                Process process = Game.Instance.Create(false);
+                Process process = Game.Instance.Create();
                 // 异步方法全部会回掉到主线程
                 process.AddSingleton<MainThreadSynchronizationContext>();
                 process.AddSingleton<TimeInfo>();

+ 185 - 103
Unity/Assets/Scripts/Core/Game/Game.cs

@@ -6,8 +6,27 @@ using System.Threading.Tasks;
 
 namespace ET
 {
-    public class Game
+    public class Game: IDisposable
     {
+        // 用来卡住所有的Process的执行
+        public struct Locker: IDisposable
+        {
+            public Locker(int _ = 0)
+            {
+                Monitor.Enter(Instance);
+            
+                // 停止调度
+                Instance.StopScheduler();
+            }
+        
+            public void Dispose()
+            {
+                Instance.StartScheduler();
+                Monitor.Exit(Instance);
+            }
+        }
+        
+        
         [StaticField]
         public static Game Instance = new();
         
@@ -15,143 +34,125 @@ namespace ET
         {
         }
         
-        private readonly ConcurrentStack<ISingleton> singletons = new();
+        private readonly Stack<ISingleton> singletons = new();
 
-        private readonly ConcurrentQueue<ISingleton> updates = new();
+        private readonly Queue<ISingleton> updates = new();
 
-        private readonly ConcurrentQueue<ISingleton> lateUpdates = new();
+        private readonly Queue<ISingleton> lateUpdates = new();
 
-        private readonly ConcurrentQueue<ISingleton> loads = new();
-
-        #region 线程安全
-
-        private bool needLoad;
+        private readonly Queue<ISingleton> loads = new();
         
-        private readonly ConcurrentQueue<Process> loops = new();
+        private readonly Queue<ISingleton> schedulers = new();
 
-        private readonly ConcurrentDictionary<int, Process> processes = new();
-        
-        private readonly Queue<ETTask> frameFinishTask = new();
+        private readonly Dictionary<int, Process> processes = new();
 
         private int idGenerator;
-
-        public Process Create(bool loop = true)
+        
+        public Process Create()
         {
-            int id = Interlocked.Increment(ref this.idGenerator);
-            Process process = new(id);
-            this.processes.TryAdd(process.Id, process);
-            if (loop)
+            lock (this)
             {
-                this.loops.Enqueue(process);
+                int id = ++this.idGenerator;
+                Process process = new(id);
+                this.processes.TryAdd(process.Id, process);
+                return process;
             }
-            return process;
         }
         
         public void Remove(int id)
         {
-            if (this.processes.Remove(id, out Process process))
+            lock (this)
             {
-                process.Dispose();    
+                if (this.processes.Remove(id, out Process process))
+                {
+                    process.Dispose();
+                }
             }
         }
-        
-        public async ETTask WaitGameFrameFinish()
-        {
-            ETTask task = ETTask.Create(true);
-            frameFinishTask.Enqueue(task);
-            await task;
-        }
-        
-        private void FrameFinishUpdateInner()
+
+        public void Loop()
         {
-            while (frameFinishTask.Count > 0)
+            lock (this)
             {
-                ETTask task = frameFinishTask.Dequeue();
-                task.SetResult();
+                this.Update();
+                this.LateUpdate();
             }
         }
-
-        public void Load()
-        {
-            this.needLoad = true;
-        }
         
-        // 简单线程调度,每次Loop会把所有Process Loop一遍
-        public void Loop()
+        public T AddSingleton<T>() where T: Singleton<T>, new()
         {
-            int count = this.loops.Count;
-
-            using Barrier barrier = new(1);
-            
-            while (count-- > 0)
+            lock (this)
             {
-                this.loops.TryDequeue(out Process process);
-                if (process == null)
+                ISingleton singleton = new T();
+                singleton.Register();
+
+                singletons.Push(singleton);
+
+                if (singleton is ISingletonAwake awake)
                 {
-                    continue;
+                    awake.Awake();
                 }
-                barrier.AddParticipant();
-                process.Barrier = barrier;
-                if (process.Id == 0)
+
+                if (singleton is ISingletonUpdate)
                 {
-                    continue;
+                    updates.Enqueue(singleton);
                 }
-                this.loops.Enqueue(process);
-                ThreadPool.QueueUserWorkItem(process.Loop);
-            }
 
-            barrier.SignalAndWait();
-            
-            // 此时没有线程竞争,进行 Load Update LateUpdate等操作
-            if (this.needLoad)
-            {
-                this.needLoad = false;
-                this.LoadInner();
-            }
-            this.UpdateInner();
-            this.LateUpdateInner();
-            this.FrameFinishUpdateInner();
-        }
+                if (singleton is ISingletonLateUpdate)
+                {
+                    lateUpdates.Enqueue(singleton);
+                }
 
-        #endregion
-        
-        
-        // 为了保证线程安全,只允许在Start之前AddSingleton,主要用于线程共用的一些东西
-        public T AddSingleton<T>() where T: Singleton<T>, new()
-        {
-            T singleton = new T();
-            AddSingleton(singleton);
-            return singleton;
+                if (singleton is ISingletonLoad)
+                {
+                    loads.Enqueue(singleton);
+                }
+                
+                if (singleton is ISingletonScheduler)
+                {
+                    this.schedulers.Enqueue(singleton);
+                }
+
+                return singleton as T;
+            }
         }
 
         public void AddSingleton(ISingleton singleton)
         {
-            singleton.Register();
-            
-            singletons.Push(singleton);
-            
-            if (singleton is ISingletonAwake awake)
+            lock (this)
             {
-                awake.Awake();
-            }
-            
-            if (singleton is ISingletonUpdate)
-            {
-                updates.Enqueue(singleton);
-            }
-            
-            if (singleton is ISingletonLateUpdate)
-            {
-                lateUpdates.Enqueue(singleton);
-            }
+                singleton.Register();
 
-            if (singleton is ISingletonLoad)
-            {
-                loads.Enqueue(singleton);
+                singletons.Push(singleton);
+
+                if (singleton is ISingletonAwake awake)
+                {
+                    awake.Awake();
+                }
+
+                if (singleton is ISingletonUpdate)
+                {
+                    updates.Enqueue(singleton);
+                }
+
+                if (singleton is ISingletonLateUpdate)
+                {
+                    lateUpdates.Enqueue(singleton);
+                }
+
+                if (singleton is ISingletonLoad)
+                {
+                    loads.Enqueue(singleton);
+                }
+                
+                if (singleton is ISingletonScheduler)
+                {
+                    this.schedulers.Enqueue(singleton);
+                }
             }
         }
 
-        private void UpdateInner()
+        private void Update()
         {
             int count = updates.Count;
             while (count-- > 0)
@@ -183,7 +184,7 @@ namespace ET
             }
         }
 
-        private void LateUpdateInner()
+        private void LateUpdate()
         {
             int count = lateUpdates.Count;
             while (count-- > 0)
@@ -215,8 +216,10 @@ namespace ET
             }
         }
 
-        private void LoadInner()
+        public void Load()
         {
+            using Locker _ = new();  // 执行Load需要停止所有的Process执行
+            
             int count = loads.Count;
             while (count-- > 0)
             {
@@ -224,7 +227,7 @@ namespace ET
                 {
                     continue;
                 }
-                
+
                 if (singleton.IsDisposed())
                 {
                     continue;
@@ -234,7 +237,7 @@ namespace ET
                 {
                     continue;
                 }
-                
+
                 loads.Enqueue(singleton);
                 try
                 {
@@ -246,5 +249,84 @@ namespace ET
                 }
             }
         }
+
+        private void StartScheduler()
+        {
+            int count = this.schedulers.Count;
+            while (count-- > 0)
+            {
+                if (!this.schedulers.TryDequeue(out ISingleton singleton))
+                {
+                    continue;
+                }
+
+                if (singleton.IsDisposed())
+                {
+                    continue;
+                }
+
+                if (singleton is not ISingletonScheduler scheduler)
+                {
+                    continue;
+                }
+
+                schedulers.Enqueue(singleton);
+                try
+                {
+                    scheduler.StartScheduler();
+                }
+                catch (Exception e)
+                {
+                    Log.Error(e);
+                }
+            }
+        }
+        
+        private void StopScheduler()
+        {
+            int count = this.schedulers.Count;
+            while (count-- > 0)
+            {
+                if (!this.schedulers.TryDequeue(out ISingleton singleton))
+                {
+                    continue;
+                }
+
+                if (singleton.IsDisposed())
+                {
+                    continue;
+                }
+
+                if (singleton is not ISingletonScheduler scheduler)
+                {
+                    continue;
+                }
+
+                schedulers.Enqueue(singleton);
+                try
+                {
+                    scheduler.StopScheduler();
+                }
+                catch (Exception e)
+                {
+                    Log.Error(e);
+                }
+            }
+            
+        }
+
+        public void Dispose()
+        {
+            using (Locker _ = new())
+            {
+                // 顺序反过来清理
+                while (singletons.Count > 0)
+                {
+                    ISingleton iSingleton = singletons.Pop();
+                    iSingleton.Destroy();
+                }
+            }
+            Instance = null;
+        }
     }
 }

+ 11 - 0
Unity/Assets/Scripts/Core/Game/ISingletonScheduler.cs

@@ -0,0 +1,11 @@
+namespace ET
+{
+    public interface ISingletonScheduler
+    {
+        void StartScheduler();
+        
+        void StopScheduler();
+
+        void Add(Process process);
+    }
+}

+ 11 - 0
Unity/Assets/Scripts/Core/Game/ISingletonScheduler.cs.meta

@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: 9c48490358ba9954b812abf6ed2f6b5b
+MonoImporter:
+  externalObjects: {}
+  serializedVersion: 2
+  defaultReferences: []
+  executionOrder: 0
+  icon: {instanceID: 0}
+  userData: 
+  assetBundleName: 
+  assetBundleVariant: 

+ 0 - 1
Unity/Assets/Scripts/Core/Game/Module/GameActor/GameActor.cs

@@ -1,7 +1,6 @@
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
-using System;
 
 namespace ET
 {

+ 8 - 0
Unity/Assets/Scripts/Core/Game/Module/ProcessScheduler.meta

@@ -0,0 +1,8 @@
+fileFormatVersion: 2
+guid: 28ffb6588744c4448960ab26c730f87f
+folderAsset: yes
+DefaultImporter:
+  externalObjects: {}
+  userData: 
+  assetBundleName: 
+  assetBundleVariant: 

+ 81 - 0
Unity/Assets/Scripts/Core/Game/Module/ProcessScheduler/ThreadPoolScheduler.cs

@@ -0,0 +1,81 @@
+using System.Collections.Generic;
+using System.Threading;
+
+namespace ET
+{
+    public class ThreadPoolScheduler: Singleton<ThreadPoolScheduler>, ISingletonScheduler, ISingletonUpdate
+    {
+        private bool isStart;
+        private HashSet<Process> Processes { get; } = new();
+        private readonly List<Process> removeProcesses = new();
+        private readonly ThreadSynchronizationContext threadSynchronizationContext = new();
+
+        public void StartScheduler()
+        {
+            this.isStart = true;
+        }
+        
+        public void Update()
+        {
+            if (!this.isStart)
+            {
+                return;
+            }
+            
+            this.threadSynchronizationContext.Update();
+            
+            removeProcesses.Clear();
+            foreach (Process process in this.Processes)
+            {
+                if (process.IsRuning)
+                {
+                    continue;
+                }
+
+                if (process.Id == 0)
+                {
+                    this.removeProcesses.Add(process);
+                }
+
+                process.IsRuning = true;
+                ThreadPool.QueueUserWorkItem(process.Loop);
+            }
+
+            foreach (Process process in this.removeProcesses)
+            {
+                this.Processes.Remove(process);
+            }
+        }
+
+        public void StopScheduler()
+        {
+            this.isStart = false;
+            
+            // 等待线程池中的Process Loop完成
+            while (true)
+            {
+                int count = 0;
+                
+                foreach (Process process in this.Processes)
+                {
+                    if (process.IsRuning)
+                    {
+                        break;
+                    }
+
+                    ++count;
+                }
+
+                if (count == this.Processes.Count)
+                {
+                    break;
+                }
+            }
+        }
+
+        public void Add(Process process)
+        {
+            threadSynchronizationContext.Post(()=>this.Processes.Add(process));
+        }
+    }
+}

+ 11 - 0
Unity/Assets/Scripts/Core/Game/Module/ProcessScheduler/ThreadPoolScheduler.cs.meta

@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: 7b43fc7847a1b0c4482d09dbb954d036
+MonoImporter:
+  externalObjects: {}
+  serializedVersion: 2
+  defaultReferences: []
+  executionOrder: 0
+  icon: {instanceID: 0}
+  userData: 
+  assetBundleName: 
+  assetBundleVariant: 

+ 13 - 6
Unity/Assets/Scripts/Core/Process/Process.cs

@@ -1,5 +1,4 @@
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Threading;
 
@@ -8,8 +7,10 @@ namespace ET
     public class Process: IDisposable
     {
         public int Id { get; private set; }
-        
-        public Barrier Barrier { get; set; }
+
+        public bool IsRuning;
+
+        public ISingletonScheduler Scheduler;
 
         public Process(int id)
         {
@@ -17,11 +18,11 @@ namespace ET
 
             this.loop = (_) =>
             {
-                this.Init();
+                this.Register();
                 this.Update();
                 this.LateUpdate();
                 this.FrameFinishUpdate();
-                this.Barrier?.RemoveParticipant();
+                this.IsRuning = false;
             };
         }
 
@@ -37,8 +38,10 @@ namespace ET
         
         private readonly WaitCallback loop;
 
-        private void Init()
+        private void Register()
         {
+            this.IsRuning = true;
+            
             foreach (IProcessSingleton singleton in this.singletons)
             {
                 singleton.Register();
@@ -190,6 +193,8 @@ namespace ET
                 ETTask task = frameFinishTask.Dequeue();
                 task.SetResult();
             }
+            
+            this.IsRuning = false;
         }
 
         public void Dispose()
@@ -204,6 +209,8 @@ namespace ET
             this.Id = 0;
             
             Game.Instance.Remove(id);
+
+            this.IsRuning = false;
             
             // 顺序反过来清理
             while (singletons.Count > 0)

+ 26 - 16
Unity/Assets/Scripts/Loader/MonoBehaviour/Init.cs

@@ -1,5 +1,5 @@
 using System;
-using System.Threading.Tasks;
+using System.Collections.Generic;
 using CommandLine;
 using UnityEngine;
 
@@ -7,10 +7,18 @@ namespace ET
 {
 	public class Init: MonoBehaviour
 	{
-		private Process process;
+		public static Init Instance { get; private set; }
+
+		public ThreadSynchronizationContext ThreadSynchronizationContext = new();
+
+		public bool IsStart;
+		
+		public Process Process;
 		
 		private void Start()
 		{
+			Instance = this;
+			
 			DontDestroyOnLoad(gameObject);
 			
 			AppDomain.CurrentDomain.UnhandledException += (sender, e) =>
@@ -24,9 +32,10 @@ namespace ET
 				.WithNotParsed(error => throw new Exception($"命令行格式错误! {error}"))
 				.WithParsed(Game.Instance.AddSingleton);
 			Game.Instance.AddSingleton<Logger>().ILog = new UnityLogger();
+			Game.Instance.AddSingleton<UnityScheduler>();
+			
+			Process process = Game.Instance.Create();
 			
-			process = Game.Instance.Create(false);
-				
 			process.AddSingleton<MainThreadSynchronizationContext>();
 
 			process.AddSingleton<GlobalComponent>();
@@ -39,34 +48,35 @@ namespace ET
 			process.AddSingleton<EventSystem>();
 			process.AddSingleton<TimerComponent>();
 			process.AddSingleton<CoroutineLockComponent>();
+			
+			UnityScheduler.Instance.Add(process);
 
 			ETTask.ExceptionHandler += Log.Error;
 
 			process.AddSingleton<CodeLoader>().Start();
-
-			Task.Run(() =>
-			{
-				while (true)
-				{
-					Game.Instance.Loop();
-				}
-			});
 		}
 
 		private void Update()
 		{
-			process.Update();
+			this.ThreadSynchronizationContext.Update();
+
+			if (!this.IsStart)
+			{
+				return;
+			}
+			
+			this.Process.Update();
 		}
 
 		private void LateUpdate()
 		{
-			process.LateUpdate();
-			process.FrameFinishUpdate();
+			this.Process.LateUpdate();
+			this.Process.FrameFinishUpdate();
 		}
 
 		private void OnApplicationQuit()
 		{
-			this.process.Dispose();
+			Game.Instance.Dispose();
 		}
 	}
 	

+ 42 - 0
Unity/Assets/Scripts/Loader/UnityScheduler.cs

@@ -0,0 +1,42 @@
+using System.Collections.Generic;
+
+namespace ET
+{
+    public class UnityScheduler: Singleton<UnityScheduler>, ISingletonScheduler
+    {
+        public bool IsStart;
+
+        private Process process;
+
+        public void StartScheduler()
+        {
+            Init.Instance.ThreadSynchronizationContext.Post(() => { Init.Instance.IsStart = true; });
+        }
+
+        public void StopScheduler()
+        {
+            Init.Instance.ThreadSynchronizationContext.Post(() => { Init.Instance.IsStart = false; });
+            
+            // Process Loop完成
+            while (true)
+            {
+                if (process.IsRuning)
+                {
+                    break;
+                }
+            }
+        }
+
+        public void Add(Process process)
+        {
+            this.process = process;
+            Init.Instance.ThreadSynchronizationContext.Post(()=>{ Init.Instance.Process = process; });
+        }
+        
+        public void Remove(Process process)
+        {
+            this.process = null;
+            Init.Instance.ThreadSynchronizationContext.Post(()=>{ Init.Instance.Process = null; });
+        }
+    }
+}

+ 11 - 0
Unity/Assets/Scripts/Loader/UnityScheduler.cs.meta

@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: e6b7b7ad555485d43855104539376d94
+MonoImporter:
+  externalObjects: {}
+  serializedVersion: 2
+  defaultReferences: []
+  executionOrder: 0
+  icon: {instanceID: 0}
+  userData: 
+  assetBundleName: 
+  assetBundleVariant: