Jelajahi Sumber

RpcChannelTest还有一个bug尚未找到

Tang Hai 14 tahun lalu
induk
melakukan
cde5b04438

+ 3 - 3
Src/Egametang/Rpc/CMakeLists.txt

@@ -6,7 +6,7 @@ SET(RpcSrc
 	RpcCommunicator.cc
 	RpcController.cc
 	RpcHandler.cc
-#	RpcChannel.cc
+	RpcChannel.cc
 	${proto_srcs}
 	)
 
@@ -14,11 +14,11 @@ SET(RpcSrc
 ADD_LIBRARY(Rpc ${RpcSrc})
 
 ADD_EXECUTABLE(RpcCommunicatorTest RpcCommunicatorTest.cc)
-#ADD_EXECUTABLE(RpcChannelTest RpcChannelTest.cc)
+ADD_EXECUTABLE(RpcChannelTest RpcChannelTest.cc)
 
 SET(Excutes 
 	RpcCommunicatorTest
-#	RpcChannelTest
+	RpcChannelTest
 )
 
 FOREACH(Excute ${Excutes})

+ 10 - 11
Src/Egametang/Rpc/RpcChannel.cc

@@ -35,19 +35,16 @@ void RpcChannel::OnAsyncConnect(const boost::system::error_code& err)
 	RecvMeta();
 }
 
-void RpcChannel::OnRecvMessage(StringPtr ss)
+void RpcChannel::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 {
-	RpcResponse response;
-	response.ParseFromString(*ss);
-	RpcHandlerPtr handler = handlers_[response.id()];
-	handler->GetResponse()->ParseFromString(response.response());
-
-	handlers_.erase(response.id());
-
+	VLOG(2) << "RpcChannel::OnRecvMessage";
+	RpcHandlerPtr handler = handlers_[meta->id];
+	handler->GetResponse()->ParseFromString(*message);
 	if (handler->GetDone() != NULL)
 	{
 		handler->GetDone()->Run();
 	}
+	handlers_.erase(meta->id);
 
 	// read size
 	RecvMeta();
@@ -70,16 +67,18 @@ void RpcChannel::CallMethod(
 		google::protobuf::Closure* done)
 {
 	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
-	handlers_[id_] = handler;
+	handlers_[++id_] = handler;
 
 	boost::hash<std::string> string_hash;
 
 	std::string message = request->SerializeAsString();
 	RpcMeta meta;
 	meta.size = message.size();
-	meta.id = ++id_;
+	VLOG(3) << "send size: " << meta.size;
+	meta.id = id_;
 	meta.method = string_hash(method->full_name());
-
+	VLOG(3) << "send meta1: " << meta.size << " "
+					<< meta.id << " " << meta.method;
 	SendMeta(meta, message);
 }
 

+ 2 - 2
Src/Egametang/Rpc/RpcChannel.h

@@ -17,14 +17,14 @@ class RpcChannel:
 		public RpcCommunicator
 {
 private:
-	typedef boost::unordered_map<int32, RpcHandlerPtr> RpcCallbackMap;
+	typedef boost::unordered_map<std::size_t, RpcHandlerPtr> RpcCallbackMap;
 
 	int32 id_;
 	RpcCallbackMap handlers_;
 
 	void OnAsyncConnect(const boost::system::error_code& err);
 
-	virtual void OnRecvMessage(StringPtr ss);
+	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
 	virtual void OnSendMessage();
 
 public:

+ 30 - 29
Src/Egametang/Rpc/RpcChannelTest.cc

@@ -42,7 +42,7 @@ public:
 			LOG(ERROR) << "async accept failed: " << err.message();
 			return;
 		}
-		RecvSize();
+		RecvMeta();
 	}
 
 	void Stop()
@@ -51,28 +51,26 @@ public:
 		socket_.close();
 	}
 
-	virtual void OnRecvMessage(StringPtr ss)
+	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 	{
 		// 接收消息
-		RpcRequest rpc_request;
-		rpc_request.ParseFromString(*ss);
 		EchoRequest request;
-		request.ParseFromString(rpc_request.request());
+		request.ParseFromString(*message);
 
 		num_ = request.num();
 		VLOG(2) << "num: " << num_;
 
 		// 回一个消息
-		RpcResponse rpc_response;
-		rpc_response.set_id(rpc_request.id());
-		rpc_response.set_type(RESPONSE_TYPE_OK);
-
 		EchoResponse response;
 		response.set_num(num_);
-		rpc_response.set_response(response.SerializeAsString());
-
-		std::string send_string = rpc_response.SerializeAsString();
-		SendSize(send_string.size(), send_string);
+		std::string send_string = response.SerializeAsString();
+
+		RpcMeta response_meta;
+		response_meta.id = meta->id;
+		response_meta.size = send_string.size();
+		VLOG(3) << "send meta: " << response_meta.size << " "
+				<< response_meta.id << " " << response_meta.method;
+		SendMeta(response_meta, send_string);
 		barrier_.Signal();
 	}
 	virtual void OnSendMessage()
@@ -82,39 +80,42 @@ public:
 
 class RpcChannelTest: public testing::Test
 {
-private:
-	CountBarrier barrier_;
-
-public:
-	RpcChannelTest(): barrier_(2)
-	{
-	}
 };
 
+static void IOServiceRun(boost::asio::io_service& io_service)
+{
+	io_service.run();
+}
 
-TEST_F(RpcChannelTest, CallMethod)
+TEST_F(RpcChannelTest, Echo)
 {
-	boost::asio::io_service io_service;
-	RpcServerTest rpc_server(io_service, global_port, barrier_);
+	boost::asio::io_service io_server;
+	boost::asio::io_service io_client;
+
+	CountBarrier barrier(2);
+	RpcServerTest rpc_server(io_server, global_port, barrier);
 
-	RpcChannel rpc_channel(io_service, "127.0.0.1", global_port);
+	RpcChannel rpc_channel(io_client, "127.0.0.1", global_port);
 	EchoService_Stub service(&rpc_channel);
 
 	ThreadPool thread_pool(2);
-	thread_pool.PushTask(boost::bind(&boost::asio::io_service::run, &io_service));
+	thread_pool.PushTask(boost::bind(&IOServiceRun, boost::ref(io_server)));
+	thread_pool.PushTask(boost::bind(&IOServiceRun, boost::ref(io_client)));
 
 	EchoRequest request;
 	request.set_num(100);
+
 	EchoResponse response;
 
 	ASSERT_EQ(0, response.num());
 	service.Echo(NULL, &request, &response,
-			google::protobuf::NewCallback(&barrier_, &CountBarrier::Signal));
-
-	barrier_.Wait();
+			google::protobuf::NewCallback(&barrier, &CountBarrier::Signal));
+	barrier.Wait();
 	rpc_channel.Stop();
 	rpc_server.Stop();
-	io_service.stop();
+	io_server.stop();
+	io_client.stop();
+	// rpc_channel是个无限循环的操作, 必须主动让channel和server stop才能wait线程
 	thread_pool.Wait();
 
 	ASSERT_EQ(100, response.num());

+ 4 - 2
Src/Egametang/Rpc/RpcCommunicator.cc

@@ -51,8 +51,9 @@ void RpcCommunicator::RecvDone(RpcMetaPtr meta, StringPtr message,
 	OnRecvMessage(meta, message);
 }
 
-void RpcCommunicator::SendMeta(RpcMeta& meta, std::string message)
+void RpcCommunicator::SendMeta(RpcMeta meta, std::string message)
 {
+	CHECK_EQ(meta.size, message.size()) << "meta and message size not match!";
 	boost::asio::async_write(socket_,
 			boost::asio::buffer(reinterpret_cast<char*>(&meta), sizeof(meta)),
 			boost::bind(&RpcCommunicator::SendMessage, this, message,
@@ -67,7 +68,8 @@ void RpcCommunicator::SendMessage(std::string message, const boost::system::erro
 		return;
 	}
 	boost::asio::async_write(socket_, boost::asio::buffer(message),
-			boost::bind(&RpcCommunicator::SendDone, this, boost::asio::placeholders::error));
+			boost::bind(&RpcCommunicator::SendDone, this,
+					boost::asio::placeholders::error));
 }
 
 void RpcCommunicator::SendDone(const boost::system::error_code& err)

+ 1 - 1
Src/Egametang/Rpc/RpcCommunicator.h

@@ -48,7 +48,7 @@ public:
 	void RecvDone(RpcMetaPtr meta, StringPtr message, const boost::system::error_code& err);
 
 	// send request
-	void SendMeta(RpcMeta& meta, std::string message);
+	void SendMeta(RpcMeta meta, std::string message);
 	void SendMessage(std::string message, const boost::system::error_code& err);
 	void SendDone(const boost::system::error_code& err);
 

+ 0 - 1
Src/Egametang/Rpc/RpcCommunicatorTest.cc

@@ -168,7 +168,6 @@ TEST_F(RpcCommunicatorTest, SendAndRecvString)
 	rpc_server_.Stop();
 	rpc_client_.Stop();
 
-	boost::hash<std::string> string_hash;
 	ASSERT_EQ(std::string("send test rpc communicator string"), rpc_server_.recv_string_);
 	ASSERT_EQ(rpc_server_.meta_->size, rpc_server_.recv_string_.size());
 	ASSERT_EQ(654321U, rpc_server_.meta_->method);

+ 5 - 0
Src/Egametang/Thread/CountBarrier.cc

@@ -32,4 +32,9 @@ int CountBarrier::Count() const
 	return count_;
 }
 
+void CountBarrier::Reset(int count)
+{
+	count_ = count;
+}
+
 } // namespace Egametang

+ 6 - 1
Src/Egametang/Thread/CountBarrier.h

@@ -13,10 +13,15 @@ private:
 	boost::condition_variable condition_;
 
 public:
-	explicit CountBarrier(int count);
+	explicit CountBarrier(int count = 1);
+
 	void Wait();
+
 	void Signal();
+
 	int Count() const;
+
+	void Reset(int count = 1);
 };
 
 } // namespace Egametang

+ 1 - 0
Src/Egametang/Thread/ThreadPool.cc

@@ -33,6 +33,7 @@ void ThreadPool::Wait()
 	{
 		done_.wait(lock);
 	}
+	running_ = true;
 }
 
 void ThreadPool::Runner()