Quellcode durchsuchen

更改了RpcCommunicator接口,发送和接收的接口十分对称,非常漂亮

tanghai vor 14 Jahren
Ursprung
Commit
259c1976a8

+ 39 - 19
Src/Egametang/Rpc/RPCCommunicator.cc

@@ -1,3 +1,5 @@
+#include <boost/bind.hpp>
+#include <boost/asio.hpp>
 #include "Rpc/RpcCommunicator.h"
 
 namespace Egametang {
@@ -6,12 +8,16 @@ RPCCommunicator::RPCCommunicator()
 {
 }
 
-void RPCCommunicator::RecvMessegeSize()
+boost::asio::ip::tcp::socket& RPCCommunicator::Socket()
+{
+	return socket_;
+}
+
+void RPCCommunicator::RecvSize()
 {
 	IntPtr size(new int);
 	boost::asio::async_read(socket_,
-			boost::asio::buffer(
-					reinterpret_cast<char*>(size.get()), sizeof(int)),
+			boost::asio::buffer(reinterpret_cast<char*>(size.get()), sizeof(int)),
 			boost::bind(&RPCCommunicator::RecvMessage, this, size,
 					boost::asio::placeholders::error));
 }
@@ -20,39 +26,53 @@ void RPCCommunicator::RecvMessage(IntPtr size, const boost::system::error_code&
 {
 	if (err)
 	{
-		LOG(ERROR) << "receive response size failed";
+		LOG(ERROR) << "receive message size failed: " << err.message();
 		return;
 	}
 	StringPtr ss;
 	boost::asio::async_read(socket_,
 			boost::asio::buffer(*ss, *size),
-			boost::bind(&RPCCommunicator::OnRecvMessage, this, ss,
+			boost::bind(&RPCCommunicator::RecvDone, this, ss,
 					boost::asio::placeholders::error));
 }
 
-
-void RPCCommunicator::SendMessage(const RpcRequestPtr request,
-		RpcHandlerPtr handler, const boost::system::error_code& err)
+void RPCCommunicator::RecvDone(StringPtr ss, const boost::system::error_code& err)
 {
 	if (err)
 	{
-		LOG(ERROR) << "SendRequestSize error:";
+		LOG(ERROR) << "receive message failed: " << err.message();
 		return;
 	}
-	std::string ss = request->SerializeAsString();
-	boost::asio::async_write(socket_, boost::asio::buffer(ss),
-			boost::bind(&RPCCommunicator::OnSendMessage, this, request->id(),
-					handler, boost::asio::placeholders::error));
+	OnRecvMessage(ss);
 }
 
-void RPCCommunicator::SendMessageSize(
-		const RpcRequestPtr request, RpcHandlerPtr handler)
+void RPCCommunicator::SendSize(int size, std::string message)
 {
-	int size = request->ByteSize();
-	std::string ss = boost::lexical_cast(size);
-	boost::asio::async_write(socket_, boost::asio::buffer(ss),
-			boost::bind(&RPCCommunicator::SendMessage, this, request,
+	std::string ssize = boost::lexical_cast(size);
+	boost::asio::async_write(socket_, boost::asio::buffer(ssize),
+			boost::bind(&RPCCommunicator::SendMessage, this, message,
 					handler, boost::asio::placeholders::error));
 }
 
+void RPCCommunicator::SendMessage(std::string message, const boost::system::error_code& err)
+{
+	if (err)
+	{
+		LOG(ERROR) << "send message size failed: " << err.message();
+		return;
+	}
+	boost::asio::async_write(socket_, boost::asio::buffer(message),
+			boost::bind(&RPCCommunicator::SendDone, this, boost::asio::placeholders::error));
+}
+
+void RPCCommunicator::SendDone(const boost::system::error_code& err)
+{
+	if (err)
+	{
+		LOG(ERROR) << "send message failed: " << err.message();
+		return;
+	}
+	OnSendMessage();
+}
+
 } // namespace Egametang

+ 11 - 7
Src/Egametang/Rpc/RPCCommunicator.h

@@ -4,6 +4,7 @@
 #include <google/protobuf/service.h>
 #include <boost/unordered_map.hpp>
 #include <boost/asio.hpp>
+#include "Base/Marcos.h"
 #include "Base/Typedef.h"
 #include "Rpc/RpcTypedef.h"
 
@@ -16,18 +17,21 @@ protected:
 
 public:
 	RPCCommunicator();
+
+	boost::asio::ip::tcp::socket& Socket();
+
 	// recieve response
-	void RecvMessegeSize();
+	void RecvSize();
 	void RecvMessage(IntPtr size, const boost::system::error_code& err);
+	void RecvDone(StringPtr ss, const boost::system::error_code& err);
 
 	// send request
-	void SendMessageSize(const RpcRequestPtr request, RpcHandlerPtr handler);
-	void SendMessage(const RpcRequestPtr request, RpcHandlerPtr handler,
-			const boost::system::error_code& err);
+	void SendSize(int size, std::string message);
+	void SendMessage(std::string message, const boost::system::error_code& err);
+	void SendDone(const boost::system::error_code& err);
 
-	virtual void OnRecvMessage(StringPtr ss, const boost::system::error_code& err) = 0;
-	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler,
-			const boost::system::error_code& err) = 0;
+	virtual void OnRecvMessage(StringPtr ss) = 0;
+	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler) = 0;
 };
 
 } // namespace Egametang

+ 131 - 0
Src/Egametang/Rpc/RPCCommunicatorTest.cc

@@ -0,0 +1,131 @@
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include "Rpc/RPCCommunicator.h"
+
+namespace Egametang {
+
+static int port = 10001;
+
+class RPCServerTest: public RPCCommunicator
+{
+public:
+	StringPtr recv_string_;
+
+	int send_id_;
+	RpcHandlerPtr send_handler_;
+
+	boost::asio::io_service& io_service_;
+	boost::asio::ip::tcp::acceptor acceptor_;
+
+	boost::asio::ip::tcp::socket server_socket;
+
+public:
+	RPCServerTest(boost::asio::io_service& io_service, int port)
+	{
+		boost::asio::ip::address address;
+		address.from_string("localhost");
+		boost::asio::ip::tcp::endpoint endpoint(address, port);
+		acceptor_.open(endpoint.protocol());
+		acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+		acceptor_.bind(endpoint);
+		acceptor_.listen();
+		acceptor_.async_accept(socket_,
+				boost::bind(&RPCServerTest::OnAsyncAccept, this,
+						boost::asio::placeholders::error));
+	}
+
+	void OnAsyncAccept(const boost::system::error_code& err)
+	{
+	}
+
+	void Start()
+	{
+		IntPtr size(new int);
+		boost::asio::async_read(socket_,
+				boost::asio::buffer(reinterpret_cast<char*>(size.get()), sizeof(int)),
+				boost::bind(&RPCCommunicator::RecvMessage, this, size,
+						boost::asio::placeholders::error));
+	}
+
+	virtual void OnRecvMessage(StringPtr ss)
+	{
+		recv_string_ = ss;
+	}
+	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler)
+	{
+		send_id_ = id;
+		send_handler_ = handler;
+	}
+};
+
+class RPCClientTest: public RPCCommunicator
+{
+public:
+	StringPtr recv_string_;
+	int send_id_;
+	RpcHandlerPtr send_handler_;
+
+	boost::asio::io_service& io_service_;
+
+public:
+	RPCClientTest(boost::asio::io_service& io_service, std::string& host, int port):
+		io_service_(io_service)
+	{
+		boost::asio::ip::address address;
+		address.from_string(host);
+		boost::asio::ip::tcp::endpoint endpoint(address, port_);
+		socket_.async_connect(endpoint,
+				boost::bind(&RPCClientTest::OnAsyncConnect, this,
+						boost::asio::placeholders::error));
+	}
+
+	void Start()
+	{
+		io_service_.run();
+	}
+
+	void OnAsyncConnect()
+	{
+	}
+
+	void SendString()
+	{
+
+		req->set_request(request->SerializeAsString());
+		RpcHandlerPtr handler(new RpcHandler(controller, response, done));
+		SendSize(req, handler);
+	}
+
+	virtual void OnRecvMessage(StringPtr ss)
+	{
+		recv_string_ = ss;
+	}
+
+	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler)
+	{
+		send_id_ = id;
+		send_handler_ = handler;
+	}
+};
+
+class RPCCommunicatorTest: public testing::Test
+{
+protected:
+	RPCServerTest rpc_server_;
+	RPCClientTest rpc_client_;
+};
+
+
+TEST_F(RPCCommunicatorTest, CallMethod)
+{
+	RpcServerTest server(io_service_, port);
+	ASSERT_EQ(0, server.size);
+
+	RpcChannel channel(io_service_, "localhost", port);
+	channel.CallMethod(NULL, NULL, request, response_, done_);
+
+	ASSERT_EQ(request.ByteSize(), server.size);
+}
+
+} // namespace Egametang

+ 18 - 24
Src/Egametang/Rpc/RpcChannel.cc

@@ -5,8 +5,7 @@
 
 namespace Egametang {
 
-RpcChannel::RpcChannel(
-		boost::asio::io_service& io_service, std::string& host, int port):
+RpcChannel::RpcChannel(boost::asio::io_service& io_service, std::string& host, int port):
 		io_service_(io_service)
 {
 	// another thread?
@@ -14,29 +13,22 @@ RpcChannel::RpcChannel(
 	address.from_string(host);
 	boost::asio::ip::tcp::endpoint endpoint(address, port);
 	socket_.async_connect(endpoint,
-			boost::bind(&RpcChannel::AsyncConnectHandler, this,
+			boost::bind(&RpcChannel::OnAsyncConnect, this,
 					boost::asio::placeholders::error));
 }
 
-void RpcChannel::AsyncConnectHandler(const boost::system::error_code& err)
+void RpcChannel::OnAsyncConnect(const boost::system::error_code& err)
 {
 	if (err)
 	{
-		LOG(ERROR) << "async connect failed";
+		LOG(ERROR) << "async connect failed: " << err.message();
 		return;
 	}
-	RecvMessegeSize();
+	RecvSize();
 }
 
-void RpcChannel::OnRecvMessage(
-		StringPtr ss, const boost::system::error_code& err)
+void RpcChannel::OnRecvMessage(StringPtr ss)
 {
-	if (err)
-	{
-		LOG(ERROR) << "receive response failed";
-		return;
-	}
-
 	RpcResponse response;
 	Response->ParseFromString(*ss);
 	RpcHandlerPtr handler = handlers_[response.id()];
@@ -50,18 +42,18 @@ void RpcChannel::OnRecvMessage(
 	handlers_.erase(response.id());
 
 	// read size
-	RecvMessegeSize();
+	RecvSize();
 }
 
-void RpcChannel::OnSendMessage(int32 id, RpcHandlerPtr handler,
-		const boost::system::error_code& err)
+void RpcChannel::OnSendMessage()
 {
-	if (err)
-	{
-		LOG(ERROR) << "SendMessage error:";
-		return;
-	}
-	handlers_[id] = handler;
+}
+
+void RpcChannel::SendRequest(RpcRequestPtr request)
+{
+	int size = request->ByteSize();
+	std::string message = request->SerializeAsString();
+	SendSize(size, message);
 }
 
 void RpcChannel::CallMethod(
@@ -76,7 +68,9 @@ void RpcChannel::CallMethod(
 	req->set_method(method->full_name());
 	req->set_request(request->SerializeAsString());
 	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
-	SendMessageSize(req, handler);
+	handlers_[id_] = handler;
+
+	SendRequest(request);
 }
 
 } // namespace Egametang

+ 4 - 7
Src/Egametang/Rpc/RpcChannel.h

@@ -23,14 +23,11 @@ private:
 	RpcCallbackMap handlers_;
 	boost::asio::io_service& io_service_;
 
-	void AsyncConnectHandler(const boost::system::error_code& err);
-
+	void OnAsyncConnect(const boost::system::error_code& err);
+	void SendRequest(RpcRequestPtr request);
 	// recieve response
-	virtual void OnRecvMessage(StringPtr ss, const boost::system::error_code& err);
-
-	// send request
-	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler,
-			const boost::system::error_code& err);
+	virtual void OnRecvMessage(StringPtr ss);
+	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler);
 
 public:
 	RpcChannel(boost::asio::io_service& service, std::string& host, int port);

+ 2 - 1
Src/Egametang/Rpc/RpcServer.cc

@@ -22,13 +22,14 @@ RpcServer::RpcServer(boost::asio::io_service& io_service, int port, ThreadPool&
 	RpcSessionPtr new_session(new RpcSession(sessions_));
 	acceptor_.async_accept(new_session->socket(),
 			boost::bind(&RpcServer::OnAsyncAccept, this,
-					boost::asio::placeholders::error));
+					new_session, boost::asio::placeholders::error));
 }
 
 void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
 {
 	if (err)
 	{
+		LOG(ERROR) << "accept fail: " << err.message();
 		return;
 	}
 	session->Start();

+ 17 - 23
Src/Egametang/Rpc/RpcSession.cc

@@ -6,45 +6,39 @@ RpcSession::RpcSession(RpcServer& rpc_server): rpc_server_(rpc_server)
 {
 }
 
-boost::asio::ip::tcp::socket& RpcSession::Socket()
+void RpcSession::OnSendMessage(int32 id, RpcHandlerPtr handler)
 {
-	return socket_;
 }
 
-void RpcSession::OnSendMessage(int32 id, RpcHandlerPtr handler,
-		const boost::system::error_code& err)
+void RpcSession::OnRecvMessage(StringPtr ss)
 {
-	if (err)
-	{
-		LOG(ERROR) << "SendMessage error:";
-		return;
-	}
-}
-
-void RpcSession::OnRecvMessage(StringPtr ss, const boost::system::error_code& err)
-{
-	if (err)
-	{
-		LOG(ERROR) << "receive request message failed";
-		return;
-	}
-
 	RpcRequestPtr request(new RpcRequest);
 	request->ParseFromString(*ss);
 
+	int size = request->ByteSize();
+	std::string message = request->SerializeAsString();
+
 	RpcResponsePtr response(new RpcResponse);
-	response->set_id(request->id_());
+	response->set_id(request->id());
 
 	rpc_server_.RunService(shared_from_this(), request,
-			boost::bind(&RPCCommunicator::SendMessegeSize, shared_from_this(), response));
+			boost::bind(&RpcSession::SendResponse,
+					shared_from_this(), response));
 
 	// read size
-	RecvMessegeSize();
+	RecvSize();
+}
+
+void RpcSession::SendResponse(RpcResponsePtr response)
+{
+	int size = response->ByteSize();
+	std::string message = response->SerializeAsString();
+	SendSize(size, message);
 }
 
 void RpcSession::Start()
 {
-	RecvMessegeSize();
+	RecvSize();
 }
 
 void RpcSession::Stop()

+ 1 - 1
Src/Egametang/Rpc/RpcSession.h

@@ -18,6 +18,7 @@ class RpcSession:
 private:
 	RpcServer& rpc_server_;
 
+	void SendResponse(RpcResponsePtr response);
 	virtual void OnRecvMessage(StringPtr ss, const boost::system::error_code& err);
 	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler,
 			const boost::system::error_code& err);
@@ -25,7 +26,6 @@ private:
 public:
 	RpcSession(RpcServer& server);
 	~RpcSession();
-	boost::asio::ip::tcp::socket& Socket();
 	void Start();
 	void Stop();
 };