Răsfoiți Sursa

RpcChannel重命名为RpcClient
isStop标志放到RpcCommunicator类

tanghai 14 ani în urmă
părinte
comite
788a280c5d

+ 3 - 3
Cpp/Platform/Rpc/CMakeLists.txt

@@ -7,7 +7,7 @@ SET(RpcSrc
 	RpcController.cc
 	RequestHandler.cc
 	ResponseHandler.cc
-	RpcChannel.cc
+	RpcClient.cc
 	RpcServer.cc
 	RpcSession.cc
 	${proto_srcs}
@@ -17,13 +17,13 @@ SET(RpcSrc
 ADD_LIBRARY(Rpc ${RpcSrc})
 
 ADD_EXECUTABLE(RpcCommunicatorTest RpcCommunicatorTest.cc)
-ADD_EXECUTABLE(RpcChannelTest RpcChannelTest.cc)
+ADD_EXECUTABLE(RpcClientTest RpcClientTest.cc)
 ADD_EXECUTABLE(RpcSessionTest RpcSessionTest.cc)
 ADD_EXECUTABLE(RpcServerTest RpcServerTest.cc)
 
 SET(Excutes 
 	RpcCommunicatorTest
-	RpcChannelTest
+	RpcClientTest
 	RpcSessionTest
 	RpcServerTest
 )

+ 8 - 18
Cpp/Platform/Rpc/RpcChannel.cc → Cpp/Platform/Rpc/RpcClient.cc

@@ -4,12 +4,12 @@
 #include <google/protobuf/message.h>
 #include <google/protobuf/descriptor.h>
 #include "Rpc/RpcCommunicator.h"
-#include "Rpc/RpcChannel.h"
+#include "Rpc/RpcClient.h"
 #include "Rpc/RequestHandler.h"
 
 namespace Egametang {
 
-RpcChannel::RpcChannel(boost::asio::io_service& ioService, std::string host, int port):
+RpcClient::RpcClient(boost::asio::io_service& ioService, std::string host, int port):
 		RpcCommunicator(ioService), id(0)
 {
 	// another thread?
@@ -17,15 +17,15 @@ RpcChannel::RpcChannel(boost::asio::io_service& ioService, std::string host, int
 	address.from_string(host);
 	boost::asio::ip::tcp::endpoint endpoint(address, port);
 	socket.async_connect(endpoint,
-			boost::bind(&RpcChannel::OnAsyncConnect, this,
+			boost::bind(&RpcClient::OnAsyncConnect, this,
 					boost::asio::placeholders::error));
 }
 
-RpcChannel::~RpcChannel()
+RpcClient::~RpcClient()
 {
 }
 
-void RpcChannel::OnAsyncConnect(const boost::system::error_code& err)
+void RpcClient::OnAsyncConnect(const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -37,7 +37,7 @@ void RpcChannel::OnAsyncConnect(const boost::system::error_code& err)
 	RecvMeta(recvMeta, recvMessage);
 }
 
-void RpcChannel::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
+void RpcClient::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 {
 	RequestHandlerPtr requestHandler = requestHandlers[meta->id];
 	requestHandlers.erase(meta->id);
@@ -52,21 +52,11 @@ void RpcChannel::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 	requestHandler->Run();
 }
 
-void RpcChannel::OnSendMessage(RpcMetaPtr meta, StringPtr message)
+void RpcClient::OnSendMessage(RpcMetaPtr meta, StringPtr message)
 {
 }
 
-void RpcChannel::Stop()
-{
-	if (isStopped)
-	{
-		return;
-	}
-	isStopped = true;
-	socket.close();
-}
-
-void RpcChannel::CallMethod(
+void RpcClient::CallMethod(
 		const google::protobuf::MethodDescriptor* method,
 		google::protobuf::RpcController* controller,
 		const google::protobuf::Message* request,

+ 7 - 9
Cpp/Platform/Rpc/RpcChannel.h → Cpp/Platform/Rpc/RpcClient.h

@@ -1,5 +1,5 @@
-#ifndef RPC_RPCCHANNEL_H
-#define RPC_RPCCHANNEL_H
+#ifndef RPC_RPCCLIENT_H
+#define RPC_RPCCLIENT_H
 
 #include <google/protobuf/service.h>
 #include <boost/unordered_map.hpp>
@@ -13,16 +13,15 @@ namespace Egametang {
 
 class RpcHandler;
 
-class RpcChannel:
+class RpcClient:
 	public google::protobuf::RpcChannel, public RpcCommunicator,
-	public boost::enable_shared_from_this<RpcChannel>
+	public boost::enable_shared_from_this<RpcClient>
 {
 private:
 	typedef boost::unordered_map<std::size_t, RequestHandlerPtr> RequestHandlerMap;
 
 	std::size_t id;
 	RequestHandlerMap requestHandlers;
-	bool isStopped;
 
 	void OnAsyncConnect(const boost::system::error_code& err);
 
@@ -30,9 +29,8 @@ private:
 	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);
 
 public:
-	RpcChannel(boost::asio::io_service& service, std::string host, int port);
-	~RpcChannel();
-	virtual void Stop();
+	RpcClient(boost::asio::io_service& service, std::string host, int port);
+	~RpcClient();
 	virtual void CallMethod(
 			const google::protobuf::MethodDescriptor* method,
 			google::protobuf::RpcController* controller,
@@ -43,4 +41,4 @@ public:
 
 } // namespace Egametang
 
-#endif // RPC_RPCCHANNEL_H
+#endif // RPC_RPCCLIENT_H

+ 9 - 9
Cpp/Platform/Rpc/RpcChannelTest.cc → Cpp/Platform/Rpc/RpcClientTest.cc

@@ -1,7 +1,7 @@
 #include <gtest/gtest.h>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include "Rpc/RpcChannel.h"
+#include "Rpc/RpcClient.h"
 #include "Thread/CountBarrier.h"
 #include "Thread/ThreadPool.h"
 #include "Rpc/RpcController.h"
@@ -77,16 +77,16 @@ public:
 	}
 };
 
-class RpcChannelTest: public testing::Test
+class RpcClientTest: public testing::Test
 {
 protected:
 	int port;
 
 public:
-	RpcChannelTest(): port(10002)
+	RpcClientTest(): port(10002)
 	{
 	}
-	virtual ~RpcChannelTest()
+	virtual ~RpcClientTest()
 	{
 	}
 };
@@ -96,15 +96,15 @@ static void IOServiceRun(boost::asio::io_service* ioService)
 	ioService->run();
 }
 
-TEST_F(RpcChannelTest, Echo)
+TEST_F(RpcClientTest, Echo)
 {
 	boost::asio::io_service ioServer;
 	boost::asio::io_service ioClient;
 
 	CountBarrier barrier(2);
 	RpcServerTest server(ioServer, port, barrier);
-	RpcChannelPtr channel(new RpcChannel(ioClient, "127.0.0.1", port));
-	EchoService_Stub service(channel.get());
+	RpcClientPtr client(new RpcClient(ioClient, "127.0.0.1", port));
+	EchoService_Stub service(client.get());
 
 	ThreadPool threadPool(2);
 	threadPool.Schedule(boost::bind(&IOServiceRun, &ioServer));
@@ -119,11 +119,11 @@ TEST_F(RpcChannelTest, Echo)
 	service.Echo(NULL, &request, &response,
 			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
 	barrier.Wait();
-	channel->Stop();
+	client->Stop();
 	server.Stop();
 	ioServer.stop();
 	ioClient.stop();
-	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程
+	// 必须主动让client和server stop才能wait线程
 	threadPool.Wait();
 
 	ASSERT_EQ(100, response.num());

+ 11 - 1
Cpp/Platform/Rpc/RpcCommunicator.cc

@@ -7,12 +7,17 @@
 namespace Egametang {
 
 RpcCommunicator::RpcCommunicator(boost::asio::io_service& service):
-		ioService(service), socket(service)
+		isStopped(false), ioService(service), socket(service)
 {
 }
 
 RpcCommunicator::~RpcCommunicator()
 {
+	if (isStopped)
+	{
+		return;
+	}
+	socket.close();
 }
 
 boost::asio::ip::tcp::socket& RpcCommunicator::Socket()
@@ -22,6 +27,11 @@ boost::asio::ip::tcp::socket& RpcCommunicator::Socket()
 
 void RpcCommunicator::Stop()
 {
+	if (isStopped)
+	{
+		return;
+	}
+	isStopped = true;
 	socket.close();
 }
 

+ 1 - 0
Cpp/Platform/Rpc/RpcCommunicator.h

@@ -37,6 +37,7 @@ struct RpcMeta
 class RpcCommunicator: public boost::noncopyable
 {
 private:
+	bool isStopped;
 	boost::asio::io_service& ioService;
 	boost::asio::ip::tcp::socket socket;
 

+ 7 - 7
Cpp/Platform/Rpc/RpcServerTest.cc

@@ -7,7 +7,7 @@
 #include <google/protobuf/service.h>
 #include "Thread/CountBarrier.h"
 #include "Thread/ThreadPool.h"
-#include "Rpc/RpcChannel.h"
+#include "Rpc/RpcClient.h"
 #include "Rpc/RpcServer.h"
 #include "Rpc/RpcSession.h"
 #include "Rpc/RpcServerMock.h"
@@ -56,7 +56,7 @@ public:
 	}
 };
 
-TEST_F(RpcServerTest, ChannelAndServer)
+TEST_F(RpcServerTest, ClientAndServer)
 {
 	ThreadPool threadPool(2);
 
@@ -67,8 +67,8 @@ TEST_F(RpcServerTest, ChannelAndServer)
 	server->Register(echoSevice);
 	ASSERT_EQ(1U, server->methods.size());
 
-	RpcChannelPtr channel(new RpcChannel(ioClient, "127.0.0.1", port));
-	EchoService_Stub service(channel.get());
+	RpcClientPtr client(new RpcClient(ioClient, "127.0.0.1", port));
+	EchoService_Stub service(client.get());
 
 	// 定义消息
 	EchoRequest request;
@@ -86,14 +86,14 @@ TEST_F(RpcServerTest, ChannelAndServer)
 	barrier.Wait();
 
 	// 加入到io线程
-	ioClient.post(boost::bind(&RpcChannel::Stop, channel));
+	ioClient.post(boost::bind(&RpcClient::Stop, client));
 	ioServer.post(boost::bind(&RpcServer::Stop, server));
 
-	// 加入任务队列,等channel和server stop,io_service才stop
+	// 加入任务队列,等client和server stop,io_service才stop
 	ioClient.post(boost::bind(&boost::asio::io_service::stop, &ioClient));
 	ioServer.post(boost::bind(&boost::asio::io_service::stop, &ioServer));
 
-	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程
+	// 必须主动让client和server stop才能wait线程
 	threadPool.Wait();
 
 	ASSERT_EQ(100, response.num());

+ 1 - 7
Cpp/Platform/Rpc/RpcSession.cc

@@ -6,13 +6,12 @@
 namespace Egametang {
 
 RpcSession::RpcSession(boost::asio::io_service& ioService, RpcServer& server):
-		RpcCommunicator(ioService), rpcServer(server), isStopped(false)
+		RpcCommunicator(ioService), rpcServer(server)
 {
 }
 
 RpcSession::~RpcSession()
 {
-	socket.close();
 }
 
 void RpcSession::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
@@ -38,11 +37,6 @@ void RpcSession::Start()
 
 void RpcSession::Stop()
 {
-	if (isStopped)
-	{
-		return;
-	}
-	isStopped = true;
 	RpcSessionPtr session = shared_from_this();
 	rpcServer.Remove(session);
 }

+ 0 - 1
Cpp/Platform/Rpc/RpcSession.h

@@ -15,7 +15,6 @@ class RpcSession: public RpcCommunicator, public boost::enable_shared_from_this<
 {
 private:
 	RpcServer& rpcServer;
-	bool isStopped;
 
 	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
 	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);

+ 2 - 2
Cpp/Platform/Rpc/RpcTypedef.h

@@ -14,7 +14,7 @@ typedef boost::shared_ptr<google::protobuf::Message> RpcMessagePtr;
 // rpc
 class RpcServer;
 class RpcSession;
-class RpcChannel;
+class RpcClient;
 class RequestHandler;
 class MethodInfo;
 class RpcMeta;
@@ -22,7 +22,7 @@ class ResponseHandler;
 
 typedef boost::shared_ptr<RpcServer> 	              RpcServerPtr;
 typedef boost::shared_ptr<RpcSession>                 RpcSessionPtr;
-typedef boost::shared_ptr<RpcChannel>                 RpcChannelPtr;
+typedef boost::shared_ptr<RpcClient>                  RpcClientPtr;
 typedef boost::shared_ptr<MethodInfo>                 MethodInfoPtr;
 typedef boost::shared_ptr<RpcMeta> 	                  RpcMetaPtr;
 typedef boost::shared_ptr<RequestHandler>             RequestHandlerPtr;