Explorar el Código

thread_pool build successfully!

tanghai hace 15 años
padre
commit
faa702e4ac
Se han modificado 6 ficheros con 130 adiciones y 20 borrados
  1. 2 0
      SConstruct
  2. 1 0
      src/SConscript
  3. 16 0
      src/thread/SConscript
  4. 38 15
      src/thread/thread_pool.cc
  5. 17 5
      src/thread/thread_pool.h
  6. 56 0
      src/thread/thread_pool_test.cc

+ 2 - 0
SConstruct

@@ -18,6 +18,8 @@ if env['mode'] == 'dbg':
 else:
 	env.Append(CCFLAGS='-O2 -g')
 
+env.Append(CPPPATH=Dir(env['mode']).abspath)
+
 Export('env')
 
 SConscript('src/SConscript', build_dir=env['mode'])

+ 1 - 0
src/SConscript

@@ -2,6 +2,7 @@ Import('env')
 
 subdirs = [
 	'experimental',
+	'thread',
 ]
 
 for subdir in subdirs:

+ 16 - 0
src/thread/SConscript

@@ -0,0 +1,16 @@
+Import('env')
+
+thread_env = env.Clone()
+
+thread_env.Append(LIBS=[
+	'gflags',
+	'glog',
+	'gtest',
+	'boost_thread',
+])
+
+thread_env.Program('thread_pool_test', 
+[
+	'thread_pool_test.cc',
+	'thread_pool.cc',
+])

+ 38 - 15
src/thread/thread_pool.cc

@@ -1,49 +1,67 @@
+#include <boost/foreach.hpp>
 #include "thread/thread_pool.h"
 
 namespace hainan
 {
-	ThreadPool::ThreadPool(int32_t n):
-			num(n), running(false)
+	ThreadPool::ThreadPool():
+			num(0), work_num(0),
+			running(false)
 	{
 	}
+	ThreadPool::~ThreadPool()
+	{
+		Stop();
+	}
 
 	void ThreadPool::Start()
 	{
+		running = true;
 		for(int i = 0; i < num; ++i)
 		{
-			boost::thread t(boost::function(&ThreadPool::Loop, boost::ref(this)));
+			shared_ptr<thread> t(new thread(bind(&ThreadPool::Loop, ref(this))));
 			threads.push_back(t);
-			t.detach();
+			t->detach();
 		}
-		running = true;
+		work_num = num;
 	}
 
 	void ThreadPool::Stop()
 	{
+		mutex::scoped_lock lock(mtx);
 		running = false;
 		cond.notify_all();
+		while(work_num != 0)
+		{
+			done.wait(lock);
+		}
 	}
 
 	void ThreadPool::Loop()
 	{
 		while(running)
 		{
-			mutex.lock();
-			while(tasks.empty())
+			function<void (void)> task;
 			{
-				cond.wait(mutex);
+				mutex::scoped_lock lock(mtx);
+				while(tasks.empty())
+				{
+					cond.wait(lock);
+				}
+				task = tasks.front();
+				tasks.pop_front();
+				cond.notify_one();
 			}
-			boost::function& t = tasks.front();
-			tasks.pop_front();
-			cond.notify_one();
-			mutex.unlock();
-			t();
+			task();
+		}
+		if(__sync_sub_and_fetch(&work_num, 1) == 0)
+		{
+			done.notify_one();
 		}
 	}
 
-	bool ThreadPool::PushTask(boost::function<void(void)> task)
+	bool ThreadPool::PushTask(function<void (void)> task)
 	{
-		boost::mutex::scoped_lock(&mutex);
+		mutex::scoped_lock lock(mtx);
 		if(!running)
 		{
 			return false;
@@ -52,4 +70,9 @@ namespace hainan
 		cond.notify_one();
 		return true;
 	}
+
+	void ThreadPool::SetNum(int32_t n)
+	{
+		num = n;
+	}
 }

+ 17 - 5
src/thread/thread_pool.h

@@ -1,18 +1,28 @@
+#ifndef THREAD_THREAD_POOL_H
+#define THREAD_THREAD_POOL_H
+
+#include <vector>
+#include <list>
 #include <boost/thread.hpp>
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
+#include <boost/smart_ptr.hpp>
 
 namespace hainan
 {
+	using namespace boost;
+	using namespace std;
 	class ThreadPool
 	{
 	private:
 		int32_t num;
+		volatile int32_t work_num;
 		volatile bool running;
-		boost::mutex mutex;
-		boost::condition_variable cond;
-		std::vector<boost::thread> threads;
-		std::list<boost::function<void(void)>> tasks;
+		mutex mtx;
+		condition_variable cond;
+		condition_variable done;
+		list<shared_ptr<thread> > threads;
+		list<function<void (void)> > tasks;
 
 		void Loop();
 
@@ -23,6 +33,8 @@ namespace hainan
 		~ThreadPool();
 		void Start();
 		void Stop();
-		bool PushTask(boost::function<void(void)> task);
+		void SetNum(int32_t n);
+		bool PushTask(function<void(void)> task);
 	};
 }
+#endif  // THREAD_THREAD_POOL_H

+ 56 - 0
src/thread/thread_pool_test.cc

@@ -0,0 +1,56 @@
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "thread/thread_pool.h"
+
+namespace hainan
+{
+	class ThreadPoolTest: public testing::Test
+	{
+		void SetUp()
+		{
+			thread_pool.SetNum(4);
+			thread_pool.Start();
+		}
+		void TearDown()
+		{
+		}
+	protected:
+		ThreadPool thread_pool;
+		boost::function<int(int)> func;
+	public:
+		void Max(int a, int b, int* z)
+		{
+			LOG(INFO) << a << " " << b;
+			*z = a > b? a : b;
+		}
+	};
+
+	TEST_F(ThreadPoolTest, Test1)
+	{
+		int x = 5;
+		int y = 6;
+		int z = 0;
+		int a = 7;
+		int b = 8;
+		int c = 0;
+		thread_pool.PushTask(
+				boost::bind(&ThreadPoolTest::Max, boost::ref(*this), x, y, &z));
+		thread_pool.PushTask(
+				boost::bind(&ThreadPoolTest::Max, boost::ref(*this), a, b, &c));
+		thread_pool.Stop();
+		ASSERT_EQ(6, z);
+		ASSERT_EQ(8, c);
+	}
+}
+
+int main(int argc, char* argv[])
+{
+	FLAGS_logtostderr = true;
+	google::ParseCommandLineFlags(&argc, &argv, true);
+	google::InitGoogleLogging(argv[0]);
+	testing::InitGoogleTest(&argc, argv);
+	return RUN_ALL_TESTS();
+}