Просмотр исходного кода

RpcServer自带一个io_service

tanghai 14 лет назад
Родитель
Сommit
fce4a8970f

+ 1 - 1
Src/CMakeLists.txt

@@ -3,7 +3,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
 SET(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/Cmake)
 SET(CMAKE_ALLOW_LOOSE_LOOP_CONSTRUCTS ON)
 
-ADD_DEFINITIONS(-DDEBUG)
+ADD_DEFINITIONS(-g)
 
 PROJECT(Cpp)
 

+ 2 - 0
Src/Egametang/Rpc/RpcChannel.cc

@@ -58,7 +58,9 @@ void RpcChannel::OnSendMessage(RpcMetaPtr meta, StringPtr message)
 
 void RpcChannel::Stop()
 {
+	VLOG(2) << __FUNCTION__;
 	RpcCommunicator::Stop();
+	VLOG(2) << __FUNCTION__ << "End";
 }
 
 void RpcChannel::CallMethod(

+ 2 - 0
Src/Egametang/Rpc/RpcCommunicator.cc

@@ -22,7 +22,9 @@ boost::asio::ip::tcp::socket& RpcCommunicator::Socket()
 
 void RpcCommunicator::Stop()
 {
+	VLOG(2) << __FUNCTION__;
 	socket.close();
+	VLOG(2) << __FUNCTION__ << "End";
 }
 
 

+ 14 - 3
Src/Egametang/Rpc/RpcServer.cc

@@ -13,8 +13,8 @@
 
 namespace Egametang {
 
-RpcServer::RpcServer(boost::asio::io_service& service, int port):
-		io_service(service), acceptor(io_service),
+RpcServer::RpcServer(int port):
+		io_service(), acceptor(io_service),
 		thread_pool(), sessions(),
 		methods()
 {
@@ -37,6 +37,7 @@ RpcServer::~RpcServer()
 
 void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
 {
+	VLOG(2) << __FUNCTION__;
 	if (err)
 	{
 		LOG(ERROR) << "accept fail: " << err.message();
@@ -57,15 +58,25 @@ void RpcServer::OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_
 			boost::bind(&ResponseHandler::Run, response_handler));
 }
 
+void RpcServer::Start()
+{
+	io_service.run();
+}
+
 void RpcServer::Stop()
 {
+	VLOG(2) << __FUNCTION__;
 	thread_pool.Wait();
 	acceptor.close();
+	VLOG(2) << "session size: " << sessions.size();
 	foreach(RpcSessionPtr session, sessions)
 	{
-		session->Stop();
+		VLOG(2) << "session stop";
+		sessions.erase(session);
 	}
 	CHECK_EQ(0U, sessions.size());
+	io_service.stop();
+	VLOG(2) << __FUNCTION__ << " End";
 }
 
 void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,

+ 3 - 2
Src/Egametang/Rpc/RpcServer.h

@@ -17,7 +17,7 @@ private:
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 	typedef boost::unordered_map<std::size_t, MethodInfoPtr> MethodMap;
 
-	boost::asio::io_service& io_service;
+	boost::asio::io_service io_service;
 	boost::asio::ip::tcp::acceptor acceptor;
 	ThreadPool thread_pool;
 	RpcSessionSet sessions;
@@ -27,13 +27,14 @@ private:
 	void OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_handler);
 
 public:
-	RpcServer(boost::asio::io_service& io_service, int port);
+	RpcServer(int port);
 	virtual ~RpcServer();
 
 	virtual void RunService(RpcSessionPtr session, RpcMetaPtr meta,
 			StringPtr message, MessageHandler handler);
 	virtual void Register(RpcServicePtr service);
 	virtual void Remove(RpcSessionPtr& session);
+	virtual void Start();
 	virtual void Stop();
 };
 

+ 1 - 2
Src/Egametang/Rpc/RpcServerMock.h

@@ -14,8 +14,7 @@ namespace Egametang {
 class RpcServerMock: public RpcServer
 {
 public:
-	RpcServerMock(boost::asio::io_service& io_service, int port):
-		RpcServer(io_service, port)
+	RpcServerMock(int port): RpcServer(port)
 	{
 	}
 

+ 7 - 8
Src/Egametang/Rpc/RpcServerTest.cc

@@ -36,18 +36,18 @@ public:
 
 static void IOServiceRun(boost::asio::io_service* io_service)
 {
+	VLOG(2) << __FUNCTION__;
 	io_service->run();
 }
 
 class RpcServerTest: public testing::Test
 {
 protected:
-	boost::asio::io_service io_server;
 	boost::asio::io_service io_client;
 	int port;
 
 public:
-	RpcServerTest(): io_server(), io_client(), port(10003)
+	RpcServerTest(): io_client(), port(10003)
 	{
 	}
 
@@ -61,31 +61,30 @@ TEST_F(RpcServerTest, ChannelAndServer)
 	ThreadPool thread_pool(2);
 
 	RpcServicePtr echo_sevice(new MyEcho);
-	RpcServer server(io_server, port);
+	RpcServer server(port);
 	server.Register(echo_sevice);
 	ASSERT_EQ(1U, server.methods.size());
 
 	RpcChannel channel(io_client, "127.0.0.1", port);
 	EchoService_Stub service(&channel);
 
-	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_server));
-	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_client));
-
 	EchoRequest request;
 	request.set_num(100);
 	EchoResponse response;
 	ASSERT_EQ(0U, response.num());
 
+	thread_pool.Schedule(boost::bind(&RpcServer::Start, &server));
+	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_client));
+
 	CountBarrier barrier;
 	service.Echo(NULL, &request, &response,
 			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
 	barrier.Wait();
-	VLOG(2) << "response: \n" << response.DebugString();
 
 	channel.Stop();
 	server.Stop();
 	io_client.stop();
-	io_server.stop();
+
 	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程
 	thread_pool.Wait();
 

+ 3 - 1
Src/Egametang/Rpc/RpcSession.cc

@@ -1,4 +1,5 @@
 #include <boost/bind.hpp>
+#include <glog/logging.h>
 #include "Rpc/RpcSession.h"
 #include "Rpc/RpcServer.h"
 
@@ -11,6 +12,7 @@ RpcSession::RpcSession(boost::asio::io_service& io_service, RpcServer& server):
 
 RpcSession::~RpcSession()
 {
+	RpcCommunicator::Stop();
 }
 
 void RpcSession::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
@@ -36,7 +38,7 @@ void RpcSession::Start()
 
 void RpcSession::Stop()
 {
-	RpcCommunicator::Stop();
+	VLOG(2) << __FUNCTION__;
 	RpcSessionPtr session = shared_from_this();
 	rpc_server.Remove(session);
 }

+ 1 - 1
Src/Egametang/Rpc/RpcSession.h

@@ -23,7 +23,7 @@ public:
 	RpcSession(boost::asio::io_service& io_service, RpcServer& server);
 	~RpcSession();
 	void Start();
-	void Stop();
+	virtual void Stop();
 };
 
 } // namespace Egametang

+ 1 - 1
Src/Egametang/Rpc/RpcSessionTest.cc

@@ -19,7 +19,7 @@ protected:
 public:
 	RpcSessionTest():
 		io_service(), port(10000),
-		mock_server(io_service, port), session(io_service, mock_server)
+		mock_server(port), session(io_service, mock_server)
 	{
 	}
 

+ 0 - 1
Src/Egametang/Thread/ThreadPool.cc

@@ -33,7 +33,6 @@ void ThreadPool::Wait()
 	{
 		done.wait(lock);
 	}
-	running = true;
 }
 
 void ThreadPool::Runner()