Просмотр исходного кода

类成员生命周期都由类自己控制,在析构的时候释放。
提供类似Stop()之类的函数去主动释放类里面的成员是不对的

tanghai 13 лет назад
Родитель
Сommit
2cc79be1f5

+ 15 - 24
Cpp/Platform/Rpc/RpcClientTest.cc

@@ -9,6 +9,8 @@
 
 namespace Egametang {
 
+static int globalPort = 10000;
+
 class RpcServerTest: public RpcCommunicator
 {
 public:
@@ -32,6 +34,11 @@ public:
 				boost::bind(&RpcServerTest::OnAsyncAccept, this,
 						boost::asio::placeholders::error));
 	}
+	~RpcServerTest()
+	{
+		acceptor.close();
+		socket.close();
+	}
 
 	void OnAsyncAccept(const boost::system::error_code& err)
 	{
@@ -44,12 +51,6 @@ public:
 		RecvMeta(meta, message);
 	}
 
-	void Stop()
-	{
-		acceptor.close();
-		socket.close();
-	}
-
 	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 	{
 		EchoRequest request;
@@ -67,6 +68,7 @@ public:
 		responseMeta->size = responseMessage->size();
 		SendMeta(responseMeta, responseMessage);
 	}
+
 	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message)
 	{
 		barrier.Signal();
@@ -75,16 +77,6 @@ public:
 
 class RpcClientTest: public testing::Test
 {
-protected:
-	int port;
-
-public:
-	RpcClientTest(): port(10002)
-	{
-	}
-	virtual ~RpcClientTest()
-	{
-	}
 };
 
 static void IOServiceRun(boost::asio::io_service* ioService)
@@ -98,12 +90,14 @@ TEST_F(RpcClientTest, Echo)
 	boost::asio::io_service ioClient;
 
 	CountBarrier barrier(2);
-	RpcServerTest server(ioServer, port, barrier);
-	auto client = boost::make_shared<RpcClient>(ioClient, "127.0.0.1", port);
+	RpcServerTest server(ioServer, globalPort, barrier);
+	auto client = boost::make_shared<RpcClient>(ioClient, "127.0.0.1", globalPort);
 	EchoService_Stub service(client.get());
 
 	boost::threadpool::fifo_pool threadPool(2);
 	threadPool.schedule(boost::bind(&IOServiceRun, &ioServer));
+
+	boost::this_thread::sleep(boost::posix_time::milliseconds(500));
 	threadPool.schedule(boost::bind(&IOServiceRun, &ioClient));
 
 	EchoRequest request;
@@ -115,13 +109,10 @@ TEST_F(RpcClientTest, Echo)
 	service.Echo(NULL, &request, &response,
 			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
 	barrier.Wait();
-	client->Stop();
-	server.Stop();
-	ioServer.stop();
-	ioClient.stop();
-	// 必须主动让client和server stop才能wait线程
-	threadPool.wait();
 
+	// 加入任务队列,等client和server stop,io_service才stop
+	ioClient.post(boost::bind(&boost::asio::io_service::stop, &ioClient));
+	ioServer.post(boost::bind(&boost::asio::io_service::stop, &ioServer));
 	ASSERT_EQ(100, response.num());
 }
 

+ 0 - 10
Cpp/Platform/Rpc/RpcCommunicator.cc

@@ -12,10 +12,6 @@ RpcCommunicator::RpcCommunicator(boost::asio::io_service& service):
 
 RpcCommunicator::~RpcCommunicator()
 {
-	if (isStopped)
-	{
-		return;
-	}
 	socket.close();
 }
 
@@ -26,12 +22,6 @@ boost::asio::ip::tcp::socket& RpcCommunicator::Socket()
 
 void RpcCommunicator::Stop()
 {
-	if (isStopped)
-	{
-		return;
-	}
-	isStopped = true;
-	socket.close();
 }
 
 

+ 1 - 2
Cpp/Platform/Rpc/RpcCommunicator.h

@@ -39,13 +39,12 @@ protected:
 	boost::asio::io_service& ioService;
 	boost::asio::ip::tcp::socket socket;
 
+	virtual void Stop();
 public:
 	explicit RpcCommunicator(boost::asio::io_service& io_service);
 	virtual ~RpcCommunicator();
 	boost::asio::ip::tcp::socket& Socket();
 
-	virtual void Stop();
-
 	// recieve response
 	void RecvMeta(RpcMetaPtr meta, StringPtr message);
 	void RecvMessage(RpcMetaPtr meta, StringPtr message, const boost::system::error_code& err);

+ 10 - 28
Cpp/Platform/Rpc/RpcCommunicatorTest.cc

@@ -74,9 +74,6 @@ public:
 
 		barrier.Signal();
 	}
-	virtual void OnSendMessage()
-	{
-	}
 };
 
 class RpcClientTest: public RpcCommunicator
@@ -135,46 +132,31 @@ public:
 		recvMeta = *meta;
 		barrier.Signal();
 	}
-
-	virtual void OnSendMessage()
-	{
-	}
 };
 
 class RpcCommunicatorTest: public testing::Test
 {
-protected:
-	boost::asio::io_service ioServer;
-	boost::asio::io_service ioClient;
-	CountBarrier barrier;
-	RpcServerTest rpcServer;
-	RpcClientTest rpcClient;
-
-public:
-	RpcCommunicatorTest():
-		ioServer(), ioClient(),
-		barrier(2), rpcServer(ioServer, globalPort, barrier),
-		rpcClient(ioClient, globalPort, barrier)
-	{
-	}
-
-	virtual ~RpcCommunicatorTest()
-	{
-	}
 };
 
 
 TEST_F(RpcCommunicatorTest, SendAndRecvString)
 {
+	boost::asio::io_service ioServer;
+	boost::asio::io_service ioClient;
+	CountBarrier barrier(2);
+	RpcServerTest rpcServer(ioServer, globalPort, barrier);
+	RpcClientTest rpcClient(ioClient, globalPort, barrier);
+
 	boost::threadpool::fifo_pool threadPool(2);
 	threadPool.schedule(boost::bind(&RpcServerTest::Start, &rpcServer));
 
-	boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
+	boost::this_thread::sleep(boost::posix_time::milliseconds(500));
 	threadPool.schedule(boost::bind(&RpcClientTest::Start, &rpcClient));
 	barrier.Wait();
 	threadPool.wait();
-	rpcServer.Stop();
-	rpcClient.Stop();
+
+	ioClient.post(boost::bind(&boost::asio::io_service::stop, &ioClient));
+	ioServer.post(boost::bind(&boost::asio::io_service::stop, &ioServer));
 
 	ASSERT_EQ(std::string("send test rpc communicator string"), rpcServer.recvMessage);
 	ASSERT_EQ(rpcServer.recvMeta.size, rpcServer.recvMessage.size());

+ 3 - 8
Cpp/Platform/Rpc/RpcServer.cc

@@ -32,6 +32,8 @@ RpcServer::RpcServer(boost::asio::io_service& service, int port):
 
 RpcServer::~RpcServer()
 {
+	threadPool.wait();
+	acceptor.close();
 }
 
 void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
@@ -55,13 +57,6 @@ void RpcServer::OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr responseH
 			boost::bind(&ResponseHandler::Run, responseHandler));
 }
 
-void RpcServer::Stop()
-{
-	threadPool.wait();
-	acceptor.close();
-	sessions.clear();
-}
-
 void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,
 		StringPtr message, MessageHandler messageHandler)
 {
@@ -96,7 +91,7 @@ void RpcServer::Register(ProtobufServicePtr service)
 	}
 }
 
-void RpcServer::Remove(RpcSessionPtr& session)
+void RpcServer::Remove(RpcSessionPtr session)
 {
 	sessions.erase(session);
 }

+ 1 - 2
Cpp/Platform/Rpc/RpcServer.h

@@ -36,8 +36,7 @@ public:
 	virtual void RunService(RpcSessionPtr session, RpcMetaPtr meta,
 			StringPtr message, MessageHandler messageHandler);
 	virtual void Register(ProtobufServicePtr service);
-	virtual void Remove(RpcSessionPtr& session);
-	virtual void Stop();
+	virtual void Remove(RpcSessionPtr session);
 };
 
 } // namespace Egametang

+ 7 - 19
Cpp/Platform/Rpc/RpcServerTest.cc

@@ -16,6 +16,8 @@
 
 namespace Egametang {
 
+static int globalPort = 10003;
+
 class MyEcho: public EchoService
 {
 public:
@@ -42,19 +44,6 @@ static void IOServiceRun(boost::asio::io_service* ioService)
 class RpcServerTest: public testing::Test
 {
 protected:
-	boost::asio::io_service ioClient;
-	boost::asio::io_service ioServer;
-	int port;
-
-public:
-	RpcServerTest(): ioClient(), ioServer(), port(10003)
-	{
-	}
-
-	virtual ~RpcServerTest()
-	{
-	}
-
 	MethodMap& GetMethodMap(RpcServerPtr server)
 	{
 		return server->methods;
@@ -63,16 +52,19 @@ public:
 
 TEST_F(RpcServerTest, ClientAndServer)
 {
+	boost::asio::io_service ioClient;
+	boost::asio::io_service ioServer;
+
 	boost::threadpool::fifo_pool threadPool(2);
 
 	auto echoSevice = boost::make_shared<MyEcho>();
 
-	auto server = boost::make_shared<RpcServer>(ioServer, port);
+	auto server = boost::make_shared<RpcServer>(ioServer, globalPort);
 	// 注册service
 	server->Register(echoSevice);
 	ASSERT_EQ(1U, GetMethodMap(server).size());
 
-	auto client = boost::make_shared<RpcClient>(ioClient, "127.0.0.1", port);
+	auto client = boost::make_shared<RpcClient>(ioClient, "127.0.0.1", globalPort);
 	EchoService_Stub service(client.get());
 
 	// 定义消息
@@ -92,10 +84,6 @@ TEST_F(RpcServerTest, ClientAndServer)
 			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
 	barrier.Wait();
 
-	// 加入到io线程
-	ioClient.post(boost::bind(&RpcClient::Stop, client));
-	ioServer.post(boost::bind(&RpcServer::Stop, server));
-
 	// 加入任务队列,等client和server stop,io_service才stop
 	ioClient.post(boost::bind(&boost::asio::io_service::stop, &ioClient));
 	ioServer.post(boost::bind(&boost::asio::io_service::stop, &ioServer));

+ 6 - 3
Cpp/Platform/Rpc/RpcSession.cc

@@ -37,11 +37,14 @@ void RpcSession::Start()
 
 void RpcSession::Stop()
 {
-	RpcSessionPtr session = shared_from_this();
-
+	if (isStopped)
+	{
+		return;
+	}
+	isStopped = true;
 	// 延迟删除,必须等所有的bind执行完成后才能remove,
 	// 否则会出现this指针失效的问题
-	ioService.post(boost::bind(&RpcServer::Remove, &rpcServer, session));
+	ioService.post(boost::bind(&RpcServer::Remove, &rpcServer, shared_from_this()));
 }
 
 } // namespace Egametang