Ver Fonte

添加RpcChannelTest.cc,准备先测试RpcChannel类

tanghai há 15 anos atrás
pai
commit
902999b733

+ 15 - 11
src/Net/RpcChannel.cc

@@ -5,8 +5,9 @@
 
 namespace Hainan {
 
-RpcChannel::RpcChannel(std::string& host, int port):
-		id(0), io_service()
+RpcChannel::RpcChannel(
+		boost::asio::io_service& service, std::string& host, int port):
+		io_service(service)
 {
 	// another thread?
 	boost::asio::ip::address address;
@@ -17,16 +18,17 @@ RpcChannel::RpcChannel(std::string& host, int port):
 					boost::asio::placeholders::error));
 }
 
-void RpcChannel::AsyncConnectHandler(const boost::system::error_code& err) {
+void RpcChannel::AsyncConnectHandler(const boost::system::error_code& err)
+{
 	if (err)
 	{
 		LOG(ERROR) << "async connect failed";
 		return;
 	}
-	RecvRequestSize();
+	RecvMessegeSize();
 }
 
-void RpcChannel::RecvRequestSize()
+void RpcChannel::RecvMessegeSize()
 {
 	IntPtr size(new int);
 	boost::asio::async_read(socket,
@@ -50,8 +52,8 @@ void RpcChannel::RecvMessage(IntPtr size, const boost::system::error_code& err)
 					boost::asio::placeholders::error));
 }
 
-void RpcChannel::RecvMessageHandler(StringPtr ss,
-		const boost::system::error_code& err)
+void RpcChannel::RecvMessageHandler(
+		StringPtr ss, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -63,10 +65,12 @@ void RpcChannel::RecvMessageHandler(StringPtr ss,
 	Response->ParseFromString(*ss);
 	RpcHandlerPtr handler = handlers[response.id()];
 	handler->GetResponse()->ParseFromString(response.response());
+
+
 	handlers.erase(response.id());
 
 	// read size
-	RecvRequestSize();
+	RecvMessegeSize();
 }
 
 void RpcChannel::SendMessageHandler(int32 id, RpcHandlerPtr handler,
@@ -94,8 +98,8 @@ void RpcChannel::SendMessage(const RpcRequestPtr request,
 					handler, boost::asio::placeholders::error));
 }
 
-void RpcChannel::SendRequestSize(const RpcRequestPtr request,
-		RpcHandlerPtr handler)
+void RpcChannel::SendMessageSize(
+		const RpcRequestPtr request, RpcHandlerPtr handler)
 {
 	int size = request->ByteSize();
 	std::string ss = boost::lexical_cast(size);
@@ -116,7 +120,7 @@ void RpcChannel::CallMethod(
 	req->set_method(method->full_name());
 	req->set_request(request->SerializeAsString());
 	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
-	SendRequestSize(req, handler);
+	SendMessageSize(req, handler);
 }
 
 } // namespace Hainan

+ 5 - 3
src/Net/RpcChannel.h

@@ -17,18 +17,18 @@ private:
 
 	int32 id;
 	RpcCallbackMap handlers;
-	boost::asio::io_service io_service;
+	boost::asio::io_service& io_service;
 	boost::asio::ip::tcp::socket socket;
 
 	void AsyncConnectHandler(const boost::system::error_code& err);
 
 	// recieve response
-	void RecvRequestSize();
+	void RecvMessegeSize();
 	void RecvMessage(IntPtr size, const boost::system::error_code& err);
 	void RecvMessageHandler(StringPtr ss, const boost::system::error_code& err);
 
 	// send request
-	void SendRequestSize(const RpcRequestPtr request, RpcHandlerPtr handler);
+	void SendMessageSize(const RpcRequestPtr request, RpcHandlerPtr handler);
 	void SendMessage(const RpcRequestPtr request,
 			RpcHandlerPtr handler, const boost::system::error_code& err);
 	void SendMessageHandler(int32 id, RpcHandlerPtr handler,
@@ -45,6 +45,8 @@ public:
 			google::protobuf::Closure* done);
 };
 
+typedef boost::shared_ptr<RpcChannel> RpcChannelPtr;
+
 } // namespace Hainan
 
 #endif // NET_RPC_CHANNEL_H

+ 72 - 0
src/Net/RpcChannelTest.cc

@@ -0,0 +1,72 @@
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "Net/RpcChannel.h"
+
+namespace Hainan {
+
+static int port = 10001;
+
+class RpcServerTest
+{
+public:
+	boost::asio::io_service io_service;
+	boost::asio::ip::tcp::acceptor acceptor;
+	boost::asio::ip::tcp::socket socket;
+
+	int size;
+
+public:
+	RpcServerTest(boost::asio::io_service& service, int port):
+		io_service(service), size(0)
+	{
+		boost::asio::ip::address address;
+		address.from_string("localhost");
+		boost::asio::ip::tcp::endpoint endpoint(address, port);
+		boost::asio::ip::tcp::acceptor acceptor;
+		acceptor.open(endpoint.protocol());
+		acceptor.set_option(
+		boost::asio::ip::tcp::acceptor::reuse_address(true));
+		acceptor.bind(endpoint);
+		acceptor.listen();
+		acceptor.async_accept(socket);
+	}
+	~RpcServerTest();
+
+	void RecvMessageSize()
+	{
+	}
+};
+
+class RpcChannelTest: public testing::Test
+{
+private:
+	int port;
+public:
+	RpcChannelTest()
+	{
+	}
+
+	void SetUp()
+	{
+		port = 10001;
+	}
+
+	void TearDown()
+	{
+	}
+};
+
+
+TEST_F(RpcChannelTest, CallMethod)
+{
+	RpcServerTest server(io_service, port);
+	ASSERT_EQ(0, server.size);
+
+	RpcChannel channel(io_service, "localhost", port);
+	channel.CallMethod(NULL, NULL, request, response, done);
+
+	ASSERT_EQ(request.ByteSize(), server.size);
+}
+
+} // namespace Hainan

+ 4 - 2
src/Net/RpcServer.cc

@@ -3,13 +3,15 @@
 #include "Base/Base.h"
 #include "Net/RpcServer.h"
 #include "Net/RpcSession.h"
+#include "Thread/ThreadPool.h"
 
 namespace Hainan {
 
-RpcServer::RpcServer(std::string& host, int port): thread_pool()
+RpcServer::RpcServer(google::protobuf::Service& pservice, int port):
+		service(pservice), io_service()
 {
 	boost::asio::ip::address address;
-	address.from_string(host);
+	address.from_string("localhost");
 	boost::asio::ip::tcp::endpoint endpoint(address, port);
 	acceptor.open(endpoint.protocol());
 	acceptor.set_option(

+ 2 - 4
src/Net/RpcServer.h

@@ -6,17 +6,15 @@ namespace Hainan {
 class RpcServer
 {
 private:
-	typedef boost::unordered_map<std::string, RpcHandlerPtr> RpcServiceMap;
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 
-	RpcServiceMap services;
+	google::protobuf::Service& service;
 	boost::asio::io_service io_service;
 	boost::asio::ip::tcp::acceptor acceptor;
-	ThreadPool thread_pool;
 	RpcSessionSet sessions;
 
 public:
-	RpcServer();
+	RpcServer(google::protobuf::Service& pservice, int port);
 	~RpcServer();
 	void Start();
 	void Stop();

+ 0 - 0
src/Net/RpcService.cc


+ 25 - 0
src/Net/RpcService.h

@@ -0,0 +1,25 @@
+#ifndef NET_RPC_SERVICE_H
+#define NET_RPC_SERVICE_H
+
+#include <google/protobuf/service.h>
+
+namespace Hainan {
+
+class RpcService
+{
+private:
+	typedef boost::unordered_map<std::string, RpcHandlerPtr> RpcServiceMap;
+
+	RpcServiceMap services;
+
+
+public:
+	RpcServer(google::protobuf::Service* service);
+	~RpcServer();
+	void Start();
+	void Stop();
+};
+
+} // namespace Hainan
+
+#endif // NET_RPC_SERVICE_H

+ 3 - 3
src/Net/RpcSession.cc

@@ -11,7 +11,7 @@ boost::asio::ip::tcp::socket& RpcSession::Socket()
 	return socket;
 }
 
-void RpcSession::RecvRequestSize()
+void RpcSession::RecvMessegeSize()
 {
 	IntPtr size(new int);
 	boost::asio::async_read(socket,
@@ -49,12 +49,12 @@ void RpcSession::RecvMessageHandler(StringPtr ss,
 
 
 	// read size
-	RecvRequestSize();
+	RecvMessegeSize();
 }
 
 void RpcSession::Start()
 {
-	RecvRequestSize();
+	RecvMessegeSize();
 }
 
 void RpcSession::Stop()

+ 3 - 1
src/Net/RpcSession.h

@@ -1,6 +1,7 @@
 #ifndef NET_RPC_SESSION_H
 #define NET_RPC_SESSION_H
 
+#include <list>
 #include <boost/asio.hpp>
 #include <boost/array.hpp>
 #include <boost/noncopyable.hpp>
@@ -16,11 +17,12 @@ private:
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 
 	boost::asio::ip::tcp::socket socket;
+	std::list<RpcResponsePtr> responses;
 	RpcSessionSet& sessions;
 	ThreadPool& thread_pool;
 
 public:
-	RpcSession(RpcSessionSet& rpc_sessions);
+	RpcSession(RpcSessionSet& rpc_sessions, ThreadPool& pool);
 	~RpcSession();
 	boost::asio::ip::tcp::socket& Socket();
 	void Start();