Просмотр исходного кода

修改了channel和server stop方式,应该由stop的人来决定在哪个线程去stop
RpcServerTest还有点问题,未完全跑过

tanghai 14 лет назад
Родитель
Сommit
7fcf5af733

+ 1 - 8
Cpp/Platform/Rpc/RpcChannel.cc

@@ -56,16 +56,9 @@ void RpcChannel::OnSendMessage(RpcMetaPtr meta, StringPtr message)
 {
 }
 
-void RpcChannel::HandleStop()
-{
-	socket.close();
-}
-
 void RpcChannel::Stop()
 {
-	// 把socket.close()调度到io_service线程,
-	// 防止两个线程同时操作socket
-	io_service.post(boost::bind(&RpcChannel::HandleStop, shared_from_this()));
+	socket.close();
 }
 
 void RpcChannel::CallMethod(

+ 0 - 2
Cpp/Platform/Rpc/RpcChannel.h

@@ -28,8 +28,6 @@ private:
 	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
 	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);
 
-	void HandleStop();
-
 public:
 	RpcChannel(boost::asio::io_service& service, std::string host, int port);
 	~RpcChannel();

+ 1 - 0
Cpp/Platform/Rpc/RpcCommunicator.cc

@@ -40,6 +40,7 @@ void RpcCommunicator::RecvMessage(RpcMetaPtr meta, StringPtr message,
 	if (err)
 	{
 		LOG(ERROR) << "receive message size failed: " << err.message();
+		VLOG(2) << "meta: " << meta->ToString() << " message: " << *message;
 		Stop();
 		return;
 	}

+ 2 - 8
Cpp/Platform/Rpc/RpcServer.cc

@@ -57,17 +57,11 @@ void RpcServer::OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_
 			boost::bind(&ResponseHandler::Run, response_handler));
 }
 
-void RpcServer::HandleStop()
-{
-	acceptor.close();
-	sessions.clear();
-}
-
 void RpcServer::Stop()
 {
 	thread_pool.Wait();
-	// 调度到io_service线程,防止两个线程竞争
-	io_service.post(boost::bind(&RpcServer::HandleStop, shared_from_this()));
+	acceptor.close();
+	sessions.clear();
 }
 
 void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,

+ 0 - 1
Cpp/Platform/Rpc/RpcServer.h

@@ -25,7 +25,6 @@ private:
 
 	void OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err);
 	void OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_handler);
-	void HandleStop();
 
 public:
 	RpcServer(boost::asio::io_service& service, int port);

+ 4 - 4
Cpp/Platform/Rpc/RpcServerTest.cc

@@ -85,10 +85,10 @@ TEST_F(RpcServerTest, ChannelAndServer)
 			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
 	barrier.Wait();
 
-	channel->Stop();
-	server->Stop();
-	io_client.stop();
-	io_server.stop();
+	io_client.post(boost::bind(&RpcChannel::Stop, channel));
+	io_server.post(boost::bind(&RpcServer::Stop, server));
+	io_client.post(boost::bind(&boost::asio::io_service::stop, &io_client));
+	io_server.post(boost::bind(&boost::asio::io_service::stop, &io_server));
 
 	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程
 	thread_pool.Wait();