Преглед изворни кода

不使用boost和std名字空间,修复了scons的一个bug,
添加了connection.cc的实现

tanghai пре 15 година
родитељ
комит
555db194c1

+ 4 - 2
src/net/SConscript

@@ -3,11 +3,13 @@ Import('env')
 net_env = env.Clone()
 net_env = env.Clone()
 
 
 net_src = [
 net_src = [
-	'epoller.cc'
+	'asyn_server.cc',
+	'connection.cc',
 ]
 ]
 
 
 net_lib = net_env.StaticLibrary('net', net_src)
 net_lib = net_env.StaticLibrary('net', net_src)
 
 
 net_env.Append(LIBS=net_lib)
 net_env.Append(LIBS=net_lib)
 
 
-net_env.Program('epoller_test.cc')
+net_env.Program('asyn_server_test.cc')
+net_env.Program('connection_test.cc')

+ 28 - 10
src/net/asyn_server.cc

@@ -5,34 +5,52 @@
 
 
 namespace hainan {
 namespace hainan {
 
 
-explicit asyn_server::asyn_server(
-		const string & address, const string & port):
-		io_service(), acceptor(io_service),
-		new_connection(new connection())
+explicit asyn_server::asyn_server(const string& address, const string& port) :
+	io_service_(), acceptor_(io_service_),
+	new_connection_(new connection(io_service_, connections_))
 {
 {
+	boost::asio::ip::tcp::resolver resolver(io_service_);
+	boost::asio::ip::tcp::resolver::query query(address, port);
+	boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
+	acceptor_.open(endpoint.protocol());
+	acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+	acceptor_.bind(endpoint);
+	acceptor_.listen();
+	acceptor_.async_accept(new_connection_->socket(),
+			boost::bind(&server::handle_accept, this,
+					asio::placeholders::error));
 }
 }
 
 
-void asyn_server::handle_accept(const system::error_code & e)
+void asyn_server::handle_accept(const system::error_code& e)
 {
 {
+	if (!e)
+	{
+		connections_.insert(new_connection_);
+		new_connection_.reset(new connection(io_service_, connections_));
+		acceptor_.async_accept(new_connection_->socket(),
+				boost::bind(&server::handle_accept, this,
+						asio::placeholders::error));
+	}
 }
 }
 
 
 void asyn_server::handle_stop()
 void asyn_server::handle_stop()
 {
 {
-	acceptor.close();
-	foreach(connection_ptr connection, all_connections)
+	acceptor_.close();
+	foreach(connection_ptr connection, connections_)
 	{
 	{
-		connection.stop();
+		connection->stop();
 	}
 	}
+	connections_.clear();
 }
 }
 
 
 void asyn_server::start()
 void asyn_server::start()
 {
 {
-	io_service.run();
+	io_service_.run();
 }
 }
 
 
 void asyn_server::stop()
 void asyn_server::stop()
 {
 {
-	io_service.post(bind(&asyn_server::handle_stop(), this));
+	io_service_.post(boost::bind(&asyn_server::handle_stop(), this));
 }
 }
 
 
 } // namespace hainan
 } // namespace hainan

+ 5 - 9
src/net/asyn_server.h

@@ -4,22 +4,18 @@
 #include <string>
 #include <string>
 #include <boost/asio.hpp>
 #include <boost/asio.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/noncopyable.hpp>
+#include "net/connection.h"
 
 
 namespace hainan {
 namespace hainan {
 
 
-using namespace std;
-using namespace boost;
-
 class asyn_server: private noncopyable
 class asyn_server: private noncopyable
 {
 {
 private:
 private:
-	typedef unordered_set<connection> connection_set;
-
 	// hold all connection
 	// hold all connection
-	connectionSet all_connections;
-	asio::io_service io_service;
-	asio::ip::tcp::acceptor acceptor;
-	connection_ptr new_connection;
+	connection_set connections_;
+	boost::asio::io_service io_service_;
+	boost::asio::ip::tcp::acceptor acceptor_;
+	connection_ptr new_connection_;
 
 
 	void handle_accept(const system::error_code& e);
 	void handle_accept(const system::error_code& e);
 	void handle_stop();
 	void handle_stop();

+ 31 - 0
src/net/connection.cc

@@ -0,0 +1,31 @@
+#include <vector>
+#include <boost/bind.hpp>
+#include "net/connection.h"
+
+namespace hainan {
+
+connection::connection(boost::asio::io_service& io_service,
+		connection_set& manager):
+	socket_(io_service), connections_(manager)
+{
+}
+
+boost::asio::ip::tcp::socket& connection::socket()
+{
+	return socket_;
+}
+
+void connection::start()
+{
+	socket_.async_read_some(boost::asio::buffer(buffer_),
+			boost::bind(&connection::handle_read, shared_from_this(),
+					boost::asio::placeholders::error,
+					boost::asio::placeholders::bytes_transferred));
+}
+
+void connection::stop()
+{
+	socket_.close();
+}
+
+} // namespace hainan

+ 14 - 11
src/net/connection.h

@@ -6,28 +6,31 @@
 #include <boost/array.hpp>
 #include <boost/array.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
 
 
 namespace hainan {
 namespace hainan {
 
 
-using namespace std;
-using namespace boost;
+typedef boost::unordered_set<connection_ptr> connection_set;
 
 
-class connection: private noncopyable
+class connection: private boost::noncopyable,
+		public boost::enable_shared_from_this<connection>
 {
 {
 private:
 private:
-	typedef unordered_set<connection> connection_set;
 
 
-	asio::ip::tcp::socket socket;
-	connection_set& all_connections;
-	boost::array<char, 8192> buffer;
+	boost::asio::ip::tcp::socket socket_;
+	connection_set& connections_;
+	boost::array<char, 8192> buffer_;
 public:
 public:
-	explicit connection(asio::io_service& io_service);
+	explicit connection::connection(boost::asio::io_service& io_service,
+			connection_set& manager);
+	boost::asio::ip::tcp::socket& socket();
 	void start();
 	void start();
 	void stop();
 	void stop();
-	virtual void handle_read();
-	virtual void handle_writer();
+	virtual void handle_read(const boost::system::error_code& e,
+			size_t bytes_transferred) = 0;
+	virtual void handle_writer(const boost::system::error_code& e) = 0;
 };
 };
-typedef shared_ptr<connection> connection_ptr;
+typedef boost::shared_ptr<connection> connection_ptr;
 
 
 } // namespace hainan
 } // namespace hainan
 
 

+ 44 - 0
src/net/connection_test.cc

@@ -0,0 +1,44 @@
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include "net/connection.h"
+
+namespace hainan {
+
+class connection1: public connection
+{
+public:
+	string content;
+	size_t bytes;
+	void handle_read(const system::error_code& e,
+			size_t bytes_transferred)
+	{
+		if (!e)
+		{
+			content = buffer_.c_array();
+			bytes = bytes_transferred;
+		}
+	}
+
+	void handle_write()
+	{
+
+	}
+};
+
+class ConnectionTest: public testing::Test
+{
+private:
+	asio::io_service io_service_;
+	connection_set connections_;
+
+	void SetUp()
+	{
+	}
+};
+
+TEST_F(ConnectionTest, Test1)
+{
+
+}
+
+} // namespace hainan

+ 32 - 31
src/thread/thread_pool.cc

@@ -4,7 +4,7 @@
 namespace hainan {
 namespace hainan {
 
 
 thread_pool::thread_pool() :
 thread_pool::thread_pool() :
-	num(0), running(false), work_num(0)
+	num_(0), running_(false), work_num_(0)
 {
 {
 }
 }
 thread_pool::~thread_pool()
 thread_pool::~thread_pool()
@@ -13,26 +13,27 @@ thread_pool::~thread_pool()
 
 
 void thread_pool::start()
 void thread_pool::start()
 {
 {
-	running = true;
-	for (int i = 0; i < num; ++i)
+	running_ = true;
+	for (int i = 0; i < num_; ++i)
 	{
 	{
-		thread_ptr t(new thread(bind(&thread_pool::runner, this)));
-		threads.push_back(t);
+		thread_ptr t(new boost::thread(
+				boost::bind(&thread_pool::runner, this)));
+		threads_.push_back(t);
 		t->detach();
 		t->detach();
 	}
 	}
-	work_num = num;
+	work_num_ = num_;
 }
 }
 
 
 void thread_pool::stop()
 void thread_pool::stop()
 {
 {
 	VLOG(3)<< "Stop";
 	VLOG(3)<< "Stop";
-	mutex::scoped_lock lock(mtx);
-	running = false;
-	cond.notify_all();
-	while(work_num > 0)
+	boost::mutex::scoped_lock lock(mutex_);
+	running_ = false;
+	cond_.notify_all();
+	while(work_num_ > 0)
 	{
 	{
-		VLOG(3) << "done tasks size = " << tasks.size();
-		done.wait(lock);
+		VLOG(3) << "done tasks size = " << tasks_.size();
+		done_.wait(lock);
 	}
 	}
 }
 }
 
 
@@ -42,26 +43,26 @@ void thread_pool::runner()
 	bool continued = true;
 	bool continued = true;
 	while(continued)
 	while(continued)
 	{
 	{
-		function<void (void)> task;
+		boost::function<void (void)> task;
 		{
 		{
 			VLOG(3) << "loop lock";
 			VLOG(3) << "loop lock";
-			mutex::scoped_lock lock(mtx);
+			boost::mutex::scoped_lock lock(mutex_);
 			VLOG(3) << "loop lock ok";
 			VLOG(3) << "loop lock ok";
-			while(running && tasks.empty())
+			while(running_ && tasks_.empty())
 			{
 			{
-				cond.wait(lock);
+				cond_.wait(lock);
 				VLOG(3) << "cond";
 				VLOG(3) << "cond";
 			}
 			}
-			if(!tasks.empty())
+			if(!tasks_.empty())
 			{
 			{
 				VLOG(3) << "fetch task";
 				VLOG(3) << "fetch task";
-				task = tasks.front();
-				tasks.pop_front();
+				task = tasks_.front();
+				tasks_.pop_front();
 			}
 			}
-			continued = running || !tasks.empty();
+			continued = running_ || !tasks_.empty();
 			VLOG(3) << "continued = " << continued
 			VLOG(3) << "continued = " << continued
-			<< "running = " << running
-			<< " tasks size = " << tasks.size();
+			<< "running = " << running_
+			<< " tasks size = " << tasks_.size();
 			VLOG(3) << "loop unlock";
 			VLOG(3) << "loop unlock";
 		}
 		}
 
 
@@ -70,32 +71,32 @@ void thread_pool::runner()
 			task();
 			task();
 		}
 		}
 	}
 	}
-	if(__sync_sub_and_fetch(&work_num, 1) == 0)
+	if(__sync_sub_and_fetch(&work_num_, 1) == 0)
 	{
 	{
-		VLOG(3) << "work_num = " << work_num;
-		done.notify_one();
+		VLOG(3) << "work_num = " << work_num_;
+		done_.notify_one();
 	}
 	}
 }
 }
 
 
-bool thread_pool::push_task(function<void (void)> task)
+bool thread_pool::push_task(boost::function<void (void)> task)
 {
 {
 	VLOG(3) << "push task";
 	VLOG(3) << "push task";
 	{
 	{
-		mutex::scoped_lock lock(mtx);
-		if(!running)
+		boost::mutex::scoped_lock lock(mutex_);
+		if(!running_)
 		{
 		{
 			return false;
 			return false;
 		}
 		}
-		tasks.push_back(task);
+		tasks_.push_back(task);
 	}
 	}
 	VLOG(3) << "push task unlock";
 	VLOG(3) << "push task unlock";
-	cond.notify_one();
+	cond_.notify_one();
 	return true;
 	return true;
 }
 }
 
 
 void thread_pool::set_num(int n)
 void thread_pool::set_num(int n)
 {
 {
-	num = n;
+	num_ = n;
 }
 }
 
 
 } // namespace hainan
 } // namespace hainan

+ 11 - 14
src/thread/thread_pool.h

@@ -10,22 +10,19 @@
 
 
 namespace hainan {
 namespace hainan {
 
 
-using namespace std;
-using namespace boost;
+typedef boost::shared_ptr<boost::thread> thread_ptr;
 
 
-typedef shared_ptr<thread> thread_ptr;
-
-class thread_pool: private noncopyable
+class thread_pool: private boost::noncopyable
 {
 {
 private:
 private:
-	int num;
-	volatile int work_num;
-	volatile bool running;
-	mutex mtx;
-	condition_variable cond;
-	condition_variable done;
-	list<thread_ptr> threads;
-	list<function<void(void)> > tasks;
+	int num_;
+	volatile int work_num_;
+	volatile bool running_;
+	boost::mutex mutex_;
+	boost::condition_variable cond_;
+	boost::condition_variable done_;
+	std::list<thread_ptr> threads_;
+	std::list<boost::function<void(void)> > tasks_;
 
 
 	void runner();
 	void runner();
 public:
 public:
@@ -34,7 +31,7 @@ public:
 	void start();
 	void start();
 	void stop();
 	void stop();
 	void set_num(int n);
 	void set_num(int n);
-	bool push_task(function<void(void)> task);
+	bool push_task(boost::function<void(void)> task);
 };
 };
 } // namespace hainan
 } // namespace hainan
 #endif  // THREAD_THREAD_POOL_H
 #endif  // THREAD_THREAD_POOL_H

+ 3 - 3
src/thread/thread_pool_test.cc

@@ -32,9 +32,9 @@ public:
 
 
 TEST_F(thread_pool_test, Test1)
 TEST_F(thread_pool_test, Test1)
 {
 {
-	vector<int> x(100, 8);
-	vector<int> y(100, 9);
-	vector<int> z(100, 0);
+	std::vector<int> x(100, 8);
+	std::vector<int> y(100, 9);
+	std::vector<int> z(100, 0);
 	for (int i = 0; i < 100; ++i)
 	for (int i = 0; i < 100; ++i)
 	{
 	{
 		pool.push_task(
 		pool.push_task(