Sfoglia il codice sorgente

增加RpcServerTest,仍有bug

tanghai 14 anni fa
parent
commit
99ba314385

+ 2 - 0
Src/Egametang/Rpc/CMakeLists.txt

@@ -19,11 +19,13 @@ ADD_LIBRARY(Rpc ${RpcSrc})
 ADD_EXECUTABLE(RpcCommunicatorTest RpcCommunicatorTest.cc)
 ADD_EXECUTABLE(RpcChannelTest RpcChannelTest.cc)
 ADD_EXECUTABLE(RpcSessionTest RpcSessionTest.cc)
+ADD_EXECUTABLE(RpcServerTest RpcServerTest.cc)
 
 SET(Excutes 
 	RpcCommunicatorTest
 	RpcChannelTest
 	RpcSessionTest
+	RpcServerTest
 )
 
 FOREACH(Excute ${Excutes})

+ 14 - 11
Src/Egametang/Rpc/RpcChannelTest.cc

@@ -9,8 +9,6 @@
 
 namespace Egametang {
 
-static int global_port = 10002;
-
 class RpcServerTest: public RpcCommunicator
 {
 public:
@@ -81,8 +79,11 @@ public:
 
 class RpcChannelTest: public testing::Test
 {
+protected:
+	int port;
+
 public:
-	RpcChannelTest()
+	RpcChannelTest(): port(10002)
 	{
 	}
 	virtual ~RpcChannelTest()
@@ -90,9 +91,10 @@ public:
 	}
 };
 
-static void IOServiceRun(boost::asio::io_service& io_service)
+static void IOServiceRun(boost::asio::io_service* io_service, CountBarrier* barrier)
 {
-	io_service.run();
+	io_service->run();
+	barrier->Signal();
 }
 
 TEST_F(RpcChannelTest, Echo)
@@ -101,15 +103,16 @@ TEST_F(RpcChannelTest, Echo)
 	boost::asio::io_service io_client;
 
 	CountBarrier barrier(2);
-	RpcServerTest rpc_server(io_server, global_port, barrier);
+	ThreadPool thread_pool(2);
+	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_server, &barrier));
+	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_client, &barrier));
+	barrier.Wait();
 
-	RpcChannel rpc_channel(io_client, "127.0.0.1", global_port);
+	barrier.Reset(2);
+	RpcServerTest rpc_server(io_server, port, barrier);
+	RpcChannel rpc_channel(io_client, "127.0.0.1", port);
 	EchoService_Stub service(&rpc_channel);
 
-	ThreadPool thread_pool(2);
-	thread_pool.Schedule(boost::bind(&IOServiceRun, boost::ref(io_server)));
-	thread_pool.Schedule(boost::bind(&IOServiceRun, boost::ref(io_client)));
-
 	EchoRequest request;
 	request.set_num(100);
 

+ 2 - 2
Src/Egametang/Rpc/RpcCommunicator.cc

@@ -6,8 +6,8 @@
 
 namespace Egametang {
 
-RpcCommunicator::RpcCommunicator(boost::asio::io_service& io_service):
-		io_service(io_service), socket(io_service)
+RpcCommunicator::RpcCommunicator(boost::asio::io_service& service):
+		io_service(service), socket(service)
 {
 }
 

+ 8 - 8
Src/Egametang/Rpc/RpcServer.cc

@@ -14,7 +14,9 @@
 namespace Egametang {
 
 RpcServer::RpcServer(boost::asio::io_service& service, int port):
-		io_service(service), acceptor(io_service), thread_pool()
+		io_service(service), acceptor(io_service),
+		thread_pool(), sessions(),
+		methods()
 {
 	boost::asio::ip::address address;
 	address.from_string("127.0.0.1");
@@ -23,15 +25,14 @@ RpcServer::RpcServer(boost::asio::io_service& service, int port):
 	acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
 	acceptor.bind(endpoint);
 	acceptor.listen();
-	RpcSessionPtr new_session(new RpcSession(*this));
+	RpcSessionPtr new_session(new RpcSession(io_service, *this));
 	acceptor.async_accept(new_session->Socket(),
 			boost::bind(&RpcServer::OnAsyncAccept, this,
 					new_session, boost::asio::placeholders::error));
 }
 
-boost::asio::io_service& RpcServer::IOService()
+RpcServer::~RpcServer()
 {
-	return io_service;
 }
 
 void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
@@ -43,7 +44,7 @@ void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_
 	}
 	session->Start();
 	sessions.insert(session);
-	RpcSessionPtr new_session(new RpcSession(*this));
+	RpcSessionPtr new_session(new RpcSession(io_service, *this));
 	acceptor.async_accept(new_session->Socket(),
 			boost::bind(&RpcServer::OnAsyncAccept, this,
 					new_session, boost::asio::placeholders::error));
@@ -70,11 +71,13 @@ void RpcServer::Stop()
 void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,
 		StringPtr message, MessageHandler message_handler)
 {
+	VLOG(3) << "meta: " << meta->ToString();
 	MethodInfoPtr method_info = methods[meta->method];
 
 	ResponseHandlerPtr response_handler(
 			new ResponseHandler(method_info, meta->id, message_handler));
 	response_handler->Request()->ParseFromString(*message);
+	VLOG(3) << "request: " << response_handler->Request()->DebugString();
 
 	google::protobuf::Closure* done = google::protobuf::NewCallback(
 			this, &RpcServer::OnCallMethod,
@@ -100,9 +103,6 @@ void RpcServer::Register(RpcServicePtr service)
 		CHECK(methods.find(method_hash) == methods.end());
 		methods[method_hash] = method_info;
 	}
-
-	const google::protobuf::Descriptor* descriptor =
-	    google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName();
 }
 
 void RpcServer::Remove(RpcSessionPtr& session)

+ 1 - 1
Src/Egametang/Rpc/RpcServer.h

@@ -5,6 +5,7 @@
 #include <boost/unordered_set.hpp>
 #include <boost/unordered_map.hpp>
 #include <google/protobuf/service.h>
+#include "Base/Marcos.h"
 #include "Thread/ThreadPool.h"
 #include "Rpc/RpcTypedef.h"
 
@@ -29,7 +30,6 @@ public:
 	RpcServer(boost::asio::io_service& io_service, int port);
 	virtual ~RpcServer();
 
-	virtual boost::asio::io_service& IOService();
 	virtual void RunService(RpcSessionPtr session, RpcMetaPtr meta,
 			StringPtr message, MessageHandler handler);
 	virtual void Register(RpcServicePtr service);

+ 0 - 1
Src/Egametang/Rpc/RpcServerMock.h

@@ -19,7 +19,6 @@ public:
 	{
 	}
 
-	MOCK_METHOD0(IOService, boost::asio::io_service&());
 	MOCK_METHOD4(RunService, void(RpcSessionPtr, RpcMetaPtr, StringPtr, MessageHandler));
 	MOCK_METHOD1(Register, void(RpcServicePtr));
 	MOCK_METHOD1(Remove, void(RpcSessionPtr&));

+ 104 - 0
Src/Egametang/Rpc/RpcServerTest.cc

@@ -0,0 +1,104 @@
+#include <boost/bind.hpp>
+#include <boost/asio.hpp>
+#include <boost/function.hpp>
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <google/protobuf/service.h>
+#include "Thread/CountBarrier.h"
+#include "Thread/ThreadPool.h"
+#include "Rpc/RpcChannel.h"
+#include "Rpc/RpcServer.h"
+#include "Rpc/RpcSession.h"
+#include "Rpc/RpcServerMock.h"
+#include "Rpc/Echo.pb.h"
+#include "Thread/ThreadPool.h"
+
+namespace Egametang {
+
+class MyEcho: public EchoService
+{
+public:
+	virtual void Echo(
+			google::protobuf::RpcController* controller,
+			const EchoRequest* request,
+			EchoResponse* response,
+			google::protobuf::Closure* done)
+	{
+		int32 num = request->num();
+		response->set_num(num);
+		VLOG(2) << "echo response: " << response->DebugString();
+		if (done)
+		{
+			done->Run();
+		}
+	}
+};
+
+static void IOServiceRun(boost::asio::io_service* io_service)
+{
+	io_service->run();
+}
+
+class RpcServerTest: public testing::Test
+{
+protected:
+	boost::asio::io_service io_server;
+	boost::asio::io_service io_client;
+	int port;
+
+public:
+	RpcServerTest(): io_server(), io_client(), port(10003)
+	{
+	}
+
+	virtual ~RpcServerTest()
+	{
+	}
+};
+
+TEST_F(RpcServerTest, ChannelAndServer)
+{
+	ThreadPool thread_pool(2);
+
+	RpcServicePtr echo_sevice(new MyEcho);
+	RpcServer server(io_server, port);
+	server.Register(echo_sevice);
+	ASSERT_EQ(1U, server.methods.size());
+
+	RpcChannel channel(io_client, "127.0.0.1", port);
+	EchoService_Stub service(&channel);
+
+	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_server));
+	thread_pool.Schedule(boost::bind(&IOServiceRun, &io_client));
+
+	EchoRequest request;
+	request.set_num(100);
+	EchoResponse response;
+	ASSERT_EQ(0U, response.num());
+
+	CountBarrier barrier(1);
+	service.Echo(NULL, &request, &response,
+			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
+	barrier.Wait();
+
+	channel.Stop();
+	server.Stop();
+	io_client.stop();
+	io_server.stop();
+	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程
+	thread_pool.Wait();
+
+	ASSERT_EQ(100, response.num());
+}
+
+} // namespace Egametang
+
+
+int main(int argc, char* argv[])
+{
+	testing::InitGoogleTest(&argc, argv);
+	google::ParseCommandLineFlags(&argc, &argv, true);
+	google::InitGoogleLogging(argv[0]);
+	return RUN_ALL_TESTS();
+}

+ 6 - 2
Src/Egametang/Rpc/RpcSession.cc

@@ -4,8 +4,12 @@
 
 namespace Egametang {
 
-RpcSession::RpcSession(RpcServer& server):
-		rpc_server(server), RpcCommunicator(rpc_server.IOService())
+RpcSession::RpcSession(boost::asio::io_service& io_service, RpcServer& server):
+		RpcCommunicator(io_service), rpc_server(server)
+{
+}
+
+RpcSession::~RpcSession()
 {
 }
 

+ 1 - 1
Src/Egametang/Rpc/RpcSession.h

@@ -20,7 +20,7 @@ private:
 	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);
 
 public:
-	RpcSession(RpcServer& server);
+	RpcSession(boost::asio::io_service& io_service, RpcServer& server);
 	~RpcSession();
 	void Start();
 	void Stop();

+ 1 - 1
Src/Egametang/Rpc/RpcSessionTest.cc

@@ -19,7 +19,7 @@ protected:
 public:
 	RpcSessionTest():
 		io_service(), port(10000),
-		mock_server(io_service, port), session(mock_server)
+		mock_server(io_service, port), session(io_service, mock_server)
 	{
 	}