Răsfoiți Sursa

RpcServerTest通过,C++ Rpc实现基本完成!

tanghai 14 ani în urmă
părinte
comite
d0ff23a7e2

+ 8 - 3
Src/Egametang/Rpc/RpcChannel.cc

@@ -56,11 +56,16 @@ void RpcChannel::OnSendMessage(RpcMetaPtr meta, StringPtr message)
 {
 }
 
+void RpcChannel::HandleStop()
+{
+	socket.close();
+}
+
 void RpcChannel::Stop()
 {
-	VLOG(2) << __FUNCTION__;
-	RpcCommunicator::Stop();
-	VLOG(2) << __FUNCTION__ << "End";
+	// 把socket.close()调度到io_service线程,
+	// 防止两个线程同时操作socket
+	io_service.post(boost::bind(&RpcChannel::HandleStop, shared_from_this()));
 }
 
 void RpcChannel::CallMethod(

+ 6 - 1
Src/Egametang/Rpc/RpcChannel.h

@@ -4,6 +4,7 @@
 #include <google/protobuf/service.h>
 #include <boost/unordered_map.hpp>
 #include <boost/asio.hpp>
+#include <boost/thread.hpp>
 #include "Base/Typedef.h"
 #include "Rpc/RpcTypedef.h"
 #include "Rpc/RpcCommunicator.h"
@@ -12,7 +13,9 @@ namespace Egametang {
 
 class RpcHandler;
 
-class RpcChannel: public google::protobuf::RpcChannel, public RpcCommunicator
+class RpcChannel:
+	public google::protobuf::RpcChannel, public RpcCommunicator,
+	public boost::enable_shared_from_this<RpcChannel>
 {
 private:
 	typedef boost::unordered_map<std::size_t, RequestHandlerPtr> RequestHandlerMap;
@@ -25,6 +28,8 @@ private:
 	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
 	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);
 
+	void HandleStop();
+
 public:
 	RpcChannel(boost::asio::io_service& service, std::string host, int port);
 	~RpcChannel();

+ 5 - 5
Src/Egametang/Rpc/RpcChannelTest.cc

@@ -102,9 +102,9 @@ TEST_F(RpcChannelTest, Echo)
 	boost::asio::io_service io_client;
 
 	CountBarrier barrier(2);
-	RpcServerTest rpc_server(io_server, port, barrier);
-	RpcChannel rpc_channel(io_client, "127.0.0.1", port);
-	EchoService_Stub service(&rpc_channel);
+	RpcServerTest server(io_server, port, barrier);
+	RpcChannelPtr channel(new RpcChannel(io_client, "127.0.0.1", port));
+	EchoService_Stub service(channel.get());
 	
 	ThreadPool thread_pool(2);
 	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_server));
@@ -119,8 +119,8 @@ TEST_F(RpcChannelTest, Echo)
 	service.Echo(NULL, &request, &response,
 			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
 	barrier.Wait();
-	rpc_channel.Stop();
-	rpc_server.Stop();
+	channel->Stop();
+	server.Stop();
 	io_server.stop();
 	io_client.stop();
 	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程

+ 0 - 2
Src/Egametang/Rpc/RpcCommunicator.cc

@@ -22,9 +22,7 @@ boost::asio::ip::tcp::socket& RpcCommunicator::Socket()
 
 void RpcCommunicator::Stop()
 {
-	VLOG(2) << __FUNCTION__;
 	socket.close();
-	VLOG(2) << __FUNCTION__ << "End";
 }
 
 

+ 8 - 17
Src/Egametang/Rpc/RpcServer.cc

@@ -13,8 +13,8 @@
 
 namespace Egametang {
 
-RpcServer::RpcServer(int port):
-		io_service(), acceptor(io_service),
+RpcServer::RpcServer(boost::asio::io_service& service, int port):
+		io_service(service), acceptor(io_service),
 		thread_pool(), sessions(),
 		methods()
 {
@@ -37,7 +37,6 @@ RpcServer::~RpcServer()
 
 void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
 {
-	VLOG(2) << __FUNCTION__;
 	if (err)
 	{
 		LOG(ERROR) << "accept fail: " << err.message();
@@ -53,30 +52,22 @@ void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_
 
 void RpcServer::OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_handler)
 {
-	// push到网络线程
+	// 调度到网络线程
 	session->Socket().get_io_service().post(
 			boost::bind(&ResponseHandler::Run, response_handler));
 }
 
-void RpcServer::Start()
+void RpcServer::HandleStop()
 {
-	io_service.run();
+	acceptor.close();
+	sessions.clear();
 }
 
 void RpcServer::Stop()
 {
-	VLOG(2) << __FUNCTION__;
 	thread_pool.Wait();
-	acceptor.close();
-	VLOG(2) << "session size: " << sessions.size();
-	foreach(RpcSessionPtr session, sessions)
-	{
-		VLOG(2) << "session stop";
-		sessions.erase(session);
-	}
-	CHECK_EQ(0U, sessions.size());
-	io_service.stop();
-	VLOG(2) << __FUNCTION__ << " End";
+	// 调度到io_service线程,防止两个线程竞争
+	io_service.post(boost::bind(&RpcServer::HandleStop, shared_from_this()));
 }
 
 void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,

+ 4 - 4
Src/Egametang/Rpc/RpcServer.h

@@ -11,13 +11,13 @@
 
 namespace Egametang {
 
-class RpcServer: public boost::noncopyable
+class RpcServer: public boost::noncopyable, public boost::enable_shared_from_this<RpcServer>
 {
 private:
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 	typedef boost::unordered_map<std::size_t, MethodInfoPtr> MethodMap;
 
-	boost::asio::io_service io_service;
+	boost::asio::io_service& io_service;
 	boost::asio::ip::tcp::acceptor acceptor;
 	ThreadPool thread_pool;
 	RpcSessionSet sessions;
@@ -25,16 +25,16 @@ private:
 
 	void OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err);
 	void OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_handler);
+	void HandleStop();
 
 public:
-	RpcServer(int port);
+	RpcServer(boost::asio::io_service& service, int port);
 	virtual ~RpcServer();
 
 	virtual void RunService(RpcSessionPtr session, RpcMetaPtr meta,
 			StringPtr message, MessageHandler handler);
 	virtual void Register(RpcServicePtr service);
 	virtual void Remove(RpcSessionPtr& session);
-	virtual void Start();
 	virtual void Stop();
 };
 

+ 2 - 1
Src/Egametang/Rpc/RpcServerMock.h

@@ -14,7 +14,8 @@ namespace Egametang {
 class RpcServerMock: public RpcServer
 {
 public:
-	RpcServerMock(int port): RpcServer(port)
+	RpcServerMock(boost::asio::io_service& service, int port):
+		RpcServer(service, port)
 	{
 	}
 

+ 15 - 10
Src/Egametang/Rpc/RpcServerTest.cc

@@ -36,7 +36,6 @@ public:
 
 static void IOServiceRun(boost::asio::io_service* io_service)
 {
-	VLOG(2) << __FUNCTION__;
 	io_service->run();
 }
 
@@ -44,10 +43,11 @@ class RpcServerTest: public testing::Test
 {
 protected:
 	boost::asio::io_service io_client;
+	boost::asio::io_service io_server;
 	int port;
 
 public:
-	RpcServerTest(): io_client(), port(10003)
+	RpcServerTest(): io_client(), io_server(), port(10003)
 	{
 	}
 
@@ -61,19 +61,23 @@ TEST_F(RpcServerTest, ChannelAndServer)
 	ThreadPool thread_pool(2);
 
 	RpcServicePtr echo_sevice(new MyEcho);
-	RpcServer server(port);
-	server.Register(echo_sevice);
-	ASSERT_EQ(1U, server.methods.size());
 
-	RpcChannel channel(io_client, "127.0.0.1", port);
-	EchoService_Stub service(&channel);
+	RpcServerPtr server(new RpcServer(io_server, port));
+	// 注册service
+	server->Register(echo_sevice);
+	ASSERT_EQ(1U, server->methods.size());
 
+	RpcChannelPtr channel(new RpcChannel(io_client, "127.0.0.1", port));
+	EchoService_Stub service(channel.get());
+
+	// 定义消息
 	EchoRequest request;
 	request.set_num(100);
 	EchoResponse response;
 	ASSERT_EQ(0U, response.num());
 
-	thread_pool.Schedule(boost::bind(&RpcServer::Start, &server));
+	// server和client分别在两个不同的线程
+	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_server));
 	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_client));
 
 	CountBarrier barrier;
@@ -81,9 +85,10 @@ TEST_F(RpcServerTest, ChannelAndServer)
 			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
 	barrier.Wait();
 
-	channel.Stop();
-	server.Stop();
+	channel->Stop();
+	server->Stop();
 	io_client.stop();
+	io_server.stop();
 
 	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程
 	thread_pool.Wait();

+ 1 - 2
Src/Egametang/Rpc/RpcSession.cc

@@ -12,7 +12,7 @@ RpcSession::RpcSession(boost::asio::io_service& io_service, RpcServer& server):
 
 RpcSession::~RpcSession()
 {
-	RpcCommunicator::Stop();
+	socket.close();
 }
 
 void RpcSession::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
@@ -38,7 +38,6 @@ void RpcSession::Start()
 
 void RpcSession::Stop()
 {
-	VLOG(2) << __FUNCTION__;
 	RpcSessionPtr session = shared_from_this();
 	rpc_server.Remove(session);
 }

+ 1 - 1
Src/Egametang/Rpc/RpcSessionTest.cc

@@ -19,7 +19,7 @@ protected:
 public:
 	RpcSessionTest():
 		io_service(), port(10000),
-		mock_server(port), session(io_service, mock_server)
+		mock_server(io_service, port), session(io_service, mock_server)
 	{
 	}