tanghai 14 лет назад
Родитель
Сommit
fb7dde1d38

+ 9 - 3
Src/Egametang/Rpc/CMakeLists.txt

@@ -1,16 +1,22 @@
-PROTOBUF_GENERATE_CPP(proto_srcs proto_hdrs RpcProtobufData.proto)
+PROTOBUF_GENERATE_CPP(proto_srcs proto_hdrs 
+	RpcProtobufData.proto 
+	RpcChannelTest.proto
+	)
 
 SET(RpcSrc 
-	RPCCommunicator.cc
+	RpcCommunicator.cc
+	RpcChannel.cc
 	${proto_srcs}
 	)
 
 ADD_LIBRARY(Rpc ${RpcSrc})
 
-ADD_EXECUTABLE(RpcCommunicatorTest RPCCommunicatorTest.cc)
+ADD_EXECUTABLE(RpcCommunicatorTest RpcCommunicatorTest.cc)
+ADD_EXECUTABLE(RpcChannelTest RpcChannelTest.cc)
 
 SET(Excutes 
 	RpcCommunicatorTest
+	RpcChannelTest
 )
 
 FOREACH(Excute ${Excutes})

+ 3 - 3
Src/Egametang/Rpc/RpcChannel.cc

@@ -5,7 +5,7 @@
 
 namespace Egametang {
 
-RpcChannel::RpcChannel(boost::asio::io_service& io_service, std::string& host, int port):
+RpcChannel::RpcChannel(boost::asio::io_service& io_service, std::string host, int port):
 		io_service_(io_service)
 {
 	// another thread?
@@ -34,13 +34,13 @@ void RpcChannel::OnRecvMessage(StringPtr ss)
 	RpcHandlerPtr handler = handlers_[response.id()];
 	handler->GetResponse()->ParseFromString(response.response());
 
+	handlers_.erase(response.id());
+
 	if (handler->GetDone() != NULL)
 	{
 		handler->GetDone()->Run();
 	}
 
-	handlers_.erase(response.id());
-
 	// read size
 	RecvSize();
 }

+ 2 - 2
Src/Egametang/Rpc/RpcChannel.h

@@ -14,7 +14,7 @@ class RpcHandler;
 
 class RpcChannel:
 		public google::protobuf::RpcChannel,
-		public RPCCommunicator
+		public RpcCommunicator
 {
 private:
 	typedef boost::unordered_map<int32, RpcHandlerPtr> RpcCallbackMap;
@@ -30,7 +30,7 @@ private:
 	virtual void OnSendMessage();
 
 public:
-	RpcChannel(boost::asio::io_service& service, std::string& host, int port);
+	RpcChannel(boost::asio::io_service& service, std::string host, int port);
 	~RpcChannel();
 	virtual void CallMethod(
 			const google::protobuf::MethodDescriptor* method,

+ 46 - 24
Src/Egametang/Rpc/RpcChannelTest.cc

@@ -5,19 +5,19 @@
 
 namespace Egametang {
 
-static int port = 10001;
+static int global_port = 10002;
 
-class RPCServerTest: public RPCCommunicator
+class RpcServerTest: public RpcCommunicator
 {
 public:
 	CountBarrier& barrier_;
-	std::string recv_string_;
+	int32 num_;
 	boost::asio::ip::tcp::acceptor acceptor_;
 
 public:
-	RPCServerTest(boost::asio::io_service& io_service, int port, CountBarrier& barrier):
-		RPCCommunicator(io_service), acceptor_(io_service),
-		barrier_(barrier)
+	RpcServerTest(boost::asio::io_service& io_service, int port, CountBarrier& barrier):
+		RpcCommunicator(io_service), acceptor_(io_service),
+		barrier_(barrier), num_(0)
 	{
 		boost::asio::ip::address address;
 		address.from_string("127.0.0.1");
@@ -27,7 +27,7 @@ public:
 		acceptor_.bind(endpoint);
 		acceptor_.listen();
 		acceptor_.async_accept(socket_,
-				boost::bind(&RPCServerTest::OnAsyncAccept, this,
+				boost::bind(&RpcServerTest::OnAsyncAccept, this,
 						boost::asio::placeholders::error));
 	}
 
@@ -41,12 +41,6 @@ public:
 		RecvSize();
 	}
 
-	void Start()
-	{
-		VLOG(2) << "Start Server";
-		io_service_.run();
-	}
-
 	void Stop()
 	{
 		acceptor_.close();
@@ -56,8 +50,25 @@ public:
 	virtual void OnRecvMessage(StringPtr ss)
 	{
 		VLOG(2) << "Server Recv string: " << *ss;
-		recv_string_ = *ss;
-		std::string send_string("response test rpc communicator string");
+
+		// 接收消息
+		RpcRequest rpc_request;
+		rpc_request.ParseFromString(*ss);
+		RpcChannelTestRequest request;
+		request.ParseFromString(rpc_request.request());
+
+		num_ = request.num();
+
+		// 回一个消息
+		RpcResponse rpc_response;
+		rpc_response.set_id(rpc_request.id());
+		rpc_response.set_type(ResponseType::RESPONSE_TYPE_OK);
+
+		RpcChannelTestResponse response;
+		response.set_num(num_);
+		rpc_response.set_response(response.SerializeAsString());
+
+		std::string send_string = rpc_response.SerializeAsString();
 		SendSize(send_string.size(), send_string);
 		barrier_.Signal();
 	}
@@ -69,12 +80,13 @@ public:
 class RpcChannelTest: public testing::Test
 {
 private:
-	boost::asio::io_service io_server_;
-	boost::asio::io_service io_client_;
-	RPCServerTest rpc_server_;
+	CountBarrier barrier_;
+	boost::asio::io_service io_service_;
+	RpcServerTest rpc_server_;
 
 public:
-	RpcChannelTest(): rpc_server_()
+	RpcChannelTest():
+		rpc_server_(io_service_, global_port, barrier_)
 	{
 	}
 };
@@ -82,13 +94,23 @@ public:
 
 TEST_F(RpcChannelTest, CallMethod)
 {
-	RpcServerTest server(io_service_, port);
-	ASSERT_EQ(0, server.size);
+	RpcChannel rpc_channel(io_service_, "127.0.0.1", global_port);
+
+	RpcChannelTestService service(rpc_channel);
+
+	RpcChannelTestRequest request;
+	request.set_num(100);
+	RpcChannelTestResponse response;
+	RpcController controller;
+
+	ASSERT_EQ(0, response.num());
+	service.Echo(&controller, &request, &response,
+			google::protobuf::NewCallback(&barrier_, &CountBarrier::Signal));
 
-	RpcChannel channel(io_service_, "localhost", port);
-	channel.CallMethod(NULL, NULL, request, response_, done_);
+	io_service_.run();
+	barrier_.Wait();
 
-	ASSERT_EQ(request.ByteSize(), server.size);
+	ASSERT_EQ(100, response.num());
 }
 
 } // namespace Egametang

+ 15 - 15
Src/Egametang/Rpc/RPCCommunicator.cc → Src/Egametang/Rpc/RpcCommunicator.cc

@@ -2,30 +2,30 @@
 #include <boost/asio.hpp>
 #include <boost/lexical_cast.hpp>
 #include <glog/logging.h>
-#include "Rpc/RPCCommunicator.h"
+#include "Rpc/RpcCommunicator.h"
 
 namespace Egametang {
 
-RPCCommunicator::RPCCommunicator(boost::asio::io_service& io_service):
+RpcCommunicator::RpcCommunicator(boost::asio::io_service& io_service):
 		io_service_(io_service), socket_(io_service)
 {
 }
 
-boost::asio::ip::tcp::socket& RPCCommunicator::Socket()
+boost::asio::ip::tcp::socket& RpcCommunicator::Socket()
 {
 	return socket_;
 }
 
-void RPCCommunicator::RecvSize()
+void RpcCommunicator::RecvSize()
 {
 	IntPtr size(new int(0));
 	boost::asio::async_read(socket_,
 			boost::asio::buffer(reinterpret_cast<char*>(size.get()), sizeof(int)),
-			boost::bind(&RPCCommunicator::RecvMessage, this, size,
+			boost::bind(&RpcCommunicator::RecvMessage, this, size,
 					boost::asio::placeholders::error));
 }
 
-void RPCCommunicator::RecvMessage(IntPtr size, const boost::system::error_code& err)
+void RpcCommunicator::RecvMessage(IntPtr size, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -35,11 +35,11 @@ void RPCCommunicator::RecvMessage(IntPtr size, const boost::system::error_code&
 	StringPtr ss(new std::string(*size, '\0'));
 	boost::asio::async_read(socket_,
 			boost::asio::buffer(reinterpret_cast<char*>(&ss->at(0)), *size),
-			boost::bind(&RPCCommunicator::RecvDone, this, ss,
+			boost::bind(&RpcCommunicator::RecvDone, this, ss,
 					boost::asio::placeholders::error));
 }
 
-void RPCCommunicator::RecvDone(StringPtr ss, const boost::system::error_code& err)
+void RpcCommunicator::RecvDone(StringPtr ss, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -49,15 +49,15 @@ void RPCCommunicator::RecvDone(StringPtr ss, const boost::system::error_code& er
 	OnRecvMessage(ss);
 }
 
-void RPCCommunicator::SendSize(int size, std::string message)
+void RpcCommunicator::SendSize(int size, std::string message)
 {
 	boost::asio::async_write(socket_,
 			boost::asio::buffer(reinterpret_cast<char*>(&size), sizeof(int)),
-			boost::bind(&RPCCommunicator::SendMessage, this, message,
+			boost::bind(&RpcCommunicator::SendMessage, this, message,
 					boost::asio::placeholders::error));
 }
 
-void RPCCommunicator::SendMessage(std::string message, const boost::system::error_code& err)
+void RpcCommunicator::SendMessage(std::string message, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -65,10 +65,10 @@ void RPCCommunicator::SendMessage(std::string message, const boost::system::erro
 		return;
 	}
 	boost::asio::async_write(socket_, boost::asio::buffer(message),
-			boost::bind(&RPCCommunicator::SendDone, this, boost::asio::placeholders::error));
+			boost::bind(&RpcCommunicator::SendDone, this, boost::asio::placeholders::error));
 }
 
-void RPCCommunicator::SendDone(const boost::system::error_code& err)
+void RpcCommunicator::SendDone(const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -78,11 +78,11 @@ void RPCCommunicator::SendDone(const boost::system::error_code& err)
 	OnSendMessage();
 }
 
-void RPCCommunicator::OnRecvMessage(StringPtr ss)
+void RpcCommunicator::OnRecvMessage(StringPtr ss)
 {
 }
 
-void RPCCommunicator::OnSendMessage()
+void RpcCommunicator::OnSendMessage()
 {
 }
 

+ 5 - 5
Src/Egametang/Rpc/RPCCommunicator.h → Src/Egametang/Rpc/RpcCommunicator.h

@@ -1,5 +1,5 @@
-#ifndef RPC_RPC_COMMUNICATOR_H
-#define RPC_RPC_COMMUNICATOR_H
+#ifndef RPC_RPCCOMMUNICATOR_H
+#define RPC_RPCCOMMUNICATOR_H
 
 #include <google/protobuf/service.h>
 #include <boost/unordered_map.hpp>
@@ -10,14 +10,14 @@
 
 namespace Egametang {
 
-class RPCCommunicator
+class RpcCommunicator
 {
 protected:
 	boost::asio::io_service& io_service_;
 	boost::asio::ip::tcp::socket socket_;
 
 public:
-	explicit RPCCommunicator(boost::asio::io_service& io_service);
+	explicit RpcCommunicator(boost::asio::io_service& io_service);
 
 	boost::asio::ip::tcp::socket& Socket();
 
@@ -37,4 +37,4 @@ public:
 
 } // namespace Egametang
 
-#endif // RPC_RPC_COMMUNICATOR_H
+#endif // RPC_RPCCOMMUNICATOR_H

+ 16 - 16
Src/Egametang/Rpc/RPCCommunicatorTest.cc → Src/Egametang/Rpc/RpcCommunicatorTest.cc

@@ -3,7 +3,7 @@
 #include <gtest/gtest.h>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include "Rpc/RPCCommunicator.h"
+#include "Rpc/RpcCommunicator.h"
 #include "Thread/ThreadPool.h"
 #include "Thread/CountBarrier.h"
 
@@ -11,7 +11,7 @@ namespace Egametang {
 
 static int global_port = 10001;
 
-class RPCServerTest: public RPCCommunicator
+class RpcServerTest: public RpcCommunicator
 {
 public:
 	CountBarrier& barrier_;
@@ -19,8 +19,8 @@ public:
 	boost::asio::ip::tcp::acceptor acceptor_;
 
 public:
-	RPCServerTest(boost::asio::io_service& io_service, int port, CountBarrier& barrier):
-		RPCCommunicator(io_service), acceptor_(io_service),
+	RpcServerTest(boost::asio::io_service& io_service, int port, CountBarrier& barrier):
+		RpcCommunicator(io_service), acceptor_(io_service),
 		barrier_(barrier)
 	{
 		boost::asio::ip::address address;
@@ -31,7 +31,7 @@ public:
 		acceptor_.bind(endpoint);
 		acceptor_.listen();
 		acceptor_.async_accept(socket_,
-				boost::bind(&RPCServerTest::OnAsyncAccept, this,
+				boost::bind(&RpcServerTest::OnAsyncAccept, this,
 						boost::asio::placeholders::error));
 	}
 
@@ -70,22 +70,22 @@ public:
 	}
 };
 
-class RPCClientTest: public RPCCommunicator
+class RpcClientTest: public RpcCommunicator
 {
 public:
 	CountBarrier& barrier_;
 	std::string recv_string_;
 
 public:
-	RPCClientTest(boost::asio::io_service& io_service, int port,
+	RpcClientTest(boost::asio::io_service& io_service, int port,
 			CountBarrier& barrier):
-		RPCCommunicator(io_service), barrier_(barrier)
+		RpcCommunicator(io_service), barrier_(barrier)
 	{
 		boost::asio::ip::address address;
 		address.from_string("127.0.0.1");
 		boost::asio::ip::tcp::endpoint endpoint(address, port);
 		socket_.async_connect(endpoint,
-				boost::bind(&RPCClientTest::OnAsyncConnect, this,
+				boost::bind(&RpcClientTest::OnAsyncConnect, this,
 						boost::asio::placeholders::error));
 	}
 
@@ -124,17 +124,17 @@ public:
 	}
 };
 
-class RPCCommunicatorTest: public testing::Test
+class RpcCommunicatorTest: public testing::Test
 {
 protected:
 	boost::asio::io_service io_server_;
 	boost::asio::io_service io_client_;
 	CountBarrier barrier_;
-	RPCServerTest rpc_server_;
-	RPCClientTest rpc_client_;
+	RpcServerTest rpc_server_;
+	RpcClientTest rpc_client_;
 
 public:
-	RPCCommunicatorTest():
+	RpcCommunicatorTest():
 		io_server_(), io_client_(),
 		barrier_(2), rpc_server_(io_server_, global_port, barrier_),
 		rpc_client_(io_client_, global_port, barrier_)
@@ -143,11 +143,11 @@ public:
 };
 
 
-TEST_F(RPCCommunicatorTest, ClientSendString)
+TEST_F(RpcCommunicatorTest, ClientSendString)
 {
 	ThreadPool thread_pool(2);
-	thread_pool.PushTask(boost::bind(&RPCServerTest::Start, &rpc_server_));
-	thread_pool.PushTask(boost::bind(&RPCClientTest::Start, &rpc_client_));
+	thread_pool.PushTask(boost::bind(&RpcServerTest::Start, &rpc_server_));
+	thread_pool.PushTask(boost::bind(&RpcClientTest::Start, &rpc_client_));
 	barrier_.Wait();
 	ASSERT_EQ(std::string("send test rpc communicator string"), rpc_server_.recv_string_);
 	ASSERT_EQ(std::string("response test rpc communicator string"), rpc_client_.recv_string_);

+ 1 - 1
Src/Egametang/Rpc/RpcSession.cc

@@ -3,7 +3,7 @@
 namespace Egametang {
 
 RpcSession::RpcSession(RpcServer& rpc_server):
-		rpc_server_(rpc_server), RPCCommunicator(rpc_server_.io_service_)
+		rpc_server_(rpc_server), RpcCommunicator(rpc_server_.io_service_)
 {
 }
 

+ 1 - 1
Src/Egametang/Rpc/RpcSession.h

@@ -12,7 +12,7 @@ class RpcServer;
 
 class RpcSession:
 		private boost::noncopyable,
-		public RPCCommunicator,
+		public RpcCommunicator,
 		public boost::enable_shared_from_this<RpcSession>
 {
 private: