Parcourir la source

增加RpcCommunicator类用来接收发送消息

tanghai il y a 14 ans
Parent
commit
d8552604d3

+ 58 - 0
Src/Egametang/Rpc/RPCCommunicator.cc

@@ -0,0 +1,58 @@
+#include "Rpc/RpcCommunicator.h"
+
+namespace Egametang {
+
+RPCCommunicator::RPCCommunicator()
+{
+}
+
+void RPCCommunicator::RecvMessegeSize()
+{
+	IntPtr size(new int);
+	boost::asio::async_read(socket_,
+			boost::asio::buffer(
+					reinterpret_cast<char*>(size.get()), sizeof(int)),
+			boost::bind(&RPCCommunicator::RecvMessage, this, size,
+					boost::asio::placeholders::error));
+}
+
+void RPCCommunicator::RecvMessage(IntPtr size, const boost::system::error_code& err)
+{
+	if (err)
+	{
+		LOG(ERROR) << "receive response size failed";
+		return;
+	}
+	StringPtr ss;
+	boost::asio::async_read(socket_,
+			boost::asio::buffer(*ss, *size),
+			boost::bind(&RPCCommunicator::OnRecvMessage, this, ss,
+					boost::asio::placeholders::error));
+}
+
+
+void RPCCommunicator::SendMessage(const RpcRequestPtr request,
+		RpcHandlerPtr handler, const boost::system::error_code& err)
+{
+	if (err)
+	{
+		LOG(ERROR) << "SendRequestSize error:";
+		return;
+	}
+	std::string ss = request->SerializeAsString();
+	boost::asio::async_write(socket_, boost::asio::buffer(ss),
+			boost::bind(&RPCCommunicator::OnSendMessage, this, request->id(),
+					handler, boost::asio::placeholders::error));
+}
+
+void RPCCommunicator::SendMessageSize(
+		const RpcRequestPtr request, RpcHandlerPtr handler)
+{
+	int size = request->ByteSize();
+	std::string ss = boost::lexical_cast(size);
+	boost::asio::async_write(socket_, boost::asio::buffer(ss),
+			boost::bind(&RPCCommunicator::SendMessage, this, request,
+					handler, boost::asio::placeholders::error));
+}
+
+} // namespace Egametang

+ 35 - 0
Src/Egametang/Rpc/RPCCommunicator.h

@@ -0,0 +1,35 @@
+#ifndef RPC_RPC_COMMUNICATOR_H
+#define RPC_RPC_COMMUNICATOR_H
+
+#include <google/protobuf/service.h>
+#include <boost/unordered_map.hpp>
+#include <boost/asio.hpp>
+#include "Base/Typedef.h"
+#include "Rpc/RpcTypedef.h"
+
+namespace Egametang {
+
+class RPCCommunicator
+{
+protected:
+	boost::asio::ip::tcp::socket socket_;
+
+public:
+	RPCCommunicator();
+	// recieve response
+	void RecvMessegeSize();
+	void RecvMessage(IntPtr size, const boost::system::error_code& err);
+
+	// send request
+	void SendMessageSize(const RpcRequestPtr request, RpcHandlerPtr handler);
+	void SendMessage(const RpcRequestPtr request, RpcHandlerPtr handler,
+			const boost::system::error_code& err);
+
+	virtual void OnRecvMessage(StringPtr ss, const boost::system::error_code& err) = 0;
+	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler,
+			const boost::system::error_code& err) = 0;
+};
+
+} // namespace Egametang
+
+#endif // RPC_RPC_COMMUNICATOR_H

+ 2 - 50
Src/Egametang/Rpc/RpcChannel.cc

@@ -28,31 +28,7 @@ void RpcChannel::AsyncConnectHandler(const boost::system::error_code& err)
 	RecvMessegeSize();
 }
 
-void RpcChannel::RecvMessegeSize()
-{
-	IntPtr size(new int);
-	boost::asio::async_read(socket_,
-			boost::asio::buffer(
-					reinterpret_cast<char*>(size.get()), sizeof(int)),
-			boost::bind(&RpcChannel::RecvMessage, this, size,
-					boost::asio::placeholders::error));
-}
-
-void RpcChannel::RecvMessage(IntPtr size, const boost::system::error_code& err)
-{
-	if (err)
-	{
-		LOG(ERROR) << "receive response size failed";
-		return;
-	}
-	StringPtr ss;
-	boost::asio::async_read(socket_,
-			boost::asio::buffer(*ss, *size),
-			boost::bind(&RpcChannel::RecvMessageHandler, this, ss,
-					boost::asio::placeholders::error));
-}
-
-void RpcChannel::RecvMessageHandler(
+void RpcChannel::OnRecvMessage(
 		StringPtr ss, const boost::system::error_code& err)
 {
 	if (err)
@@ -77,7 +53,7 @@ void RpcChannel::RecvMessageHandler(
 	RecvMessegeSize();
 }
 
-void RpcChannel::SendMessageHandler(int32 id, RpcHandlerPtr handler,
+void RpcChannel::OnSendMessage(int32 id, RpcHandlerPtr handler,
 		const boost::system::error_code& err)
 {
 	if (err)
@@ -88,30 +64,6 @@ void RpcChannel::SendMessageHandler(int32 id, RpcHandlerPtr handler,
 	handlers_[id] = handler;
 }
 
-void RpcChannel::SendMessage(const RpcRequestPtr request,
-		RpcHandlerPtr handler, const boost::system::error_code& err)
-{
-	if (err)
-	{
-		LOG(ERROR) << "SendRequestSize error:";
-		return;
-	}
-	std::string ss = request->SerializeAsString();
-	boost::asio::async_write(socket_, boost::asio::buffer(ss),
-			boost::bind(&RpcChannel::SendMessageHandler, this, request->id(),
-					handler, boost::asio::placeholders::error));
-}
-
-void RpcChannel::SendMessageSize(
-		const RpcRequestPtr request, RpcHandlerPtr handler)
-{
-	int size = request->ByteSize();
-	std::string ss = boost::lexical_cast(size);
-	boost::asio::async_write(socket_, boost::asio::buffer(ss),
-			boost::bind(&RpcChannel::SendMessage, this, request,
-					handler, boost::asio::placeholders::error));
-}
-
 void RpcChannel::CallMethod(
 		const google::protobuf::MethodDescriptor* method,
 		google::protobuf::RpcController* controller,

+ 6 - 9
Src/Egametang/Rpc/RpcChannel.h

@@ -6,12 +6,15 @@
 #include <boost/asio.hpp>
 #include "Base/Typedef.h"
 #include "Rpc/RpcTypedef.h"
+#include "Rpc/RpcCommunicator.h"
 
 namespace Egametang {
 
 class RpcHandler;
 
-class RpcChannel: public google::protobuf::RpcChannel
+class RpcChannel:
+		public google::protobuf::RpcChannel,
+		public RPCCommunicator
 {
 private:
 	typedef boost::unordered_map<int32, RpcHandlerPtr> RpcCallbackMap;
@@ -19,20 +22,14 @@ private:
 	int32 id_;
 	RpcCallbackMap handlers_;
 	boost::asio::io_service& io_service_;
-	boost::asio::ip::tcp::socket socket_;
 
 	void AsyncConnectHandler(const boost::system::error_code& err);
 
 	// recieve response
-	void RecvMessegeSize();
-	void RecvMessage(IntPtr size, const boost::system::error_code& err);
-	void RecvMessageHandler(StringPtr ss, const boost::system::error_code& err);
+	virtual void OnRecvMessage(StringPtr ss, const boost::system::error_code& err);
 
 	// send request
-	void SendMessageSize(const RpcRequestPtr request, RpcHandlerPtr handler);
-	void SendMessage(const RpcRequestPtr request,
-			RpcHandlerPtr handler, const boost::system::error_code& err);
-	void SendMessageHandler(int32 id, RpcHandlerPtr handler,
+	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler,
 			const boost::system::error_code& err);
 
 public:

+ 8 - 3
Src/Egametang/Rpc/RpcServer.cc

@@ -21,11 +21,11 @@ RpcServer::RpcServer(boost::asio::io_service& io_service, int port, ThreadPool&
 	acceptor_.listen();
 	RpcSessionPtr new_session(new RpcSession(sessions_));
 	acceptor_.async_accept(new_session->socket(),
-			boost::bind(&RpcServer::HandleAsyncAccept, this,
+			boost::bind(&RpcServer::OnAsyncAccept, this,
 					boost::asio::placeholders::error));
 }
 
-void RpcServer::HandleAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
+void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -35,7 +35,7 @@ void RpcServer::HandleAsyncAccept(RpcSessionPtr session, const boost::system::er
 	sessions_.insert(session);
 	RpcSessionPtr new_session(new RpcSession(*this));
 	acceptor_.async_accept(new_session->socket(),
-			boost::bind(&RpcServer::HandleAsyncAccept, this,
+			boost::bind(&RpcServer::OnAsyncAccept, this,
 					boost::asio::placeholders::error));
 }
 
@@ -75,4 +75,9 @@ void RpcServer::Start()
 	io_service_.run();
 }
 
+void RpcServer::RemoveSession(RpcSessionPtr& session)
+{
+	sessions_.erase(session);
+}
+
 } // namespace Egametang

+ 2 - 1
Src/Egametang/Rpc/RpcServer.h

@@ -16,7 +16,7 @@ private:
 	ThreadPool& thread_pool_;
 	RpcSessionSet sessions_;
 
-	void HandleAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err);
+	void OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err);
 	void Callback(RpcSessionPtr session,
 			boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler);
 
@@ -29,6 +29,7 @@ public:
 	void RunService(RpcSessionPtr session, RpcRequestPtr request,
 			boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler);
 	void RegisterService(ProtobufServicePtr service);
+	void RemoveSession(RpcSessionPtr& session);
 };
 
 } // namespace Egametang

+ 4 - 50
Src/Egametang/Rpc/RpcSession.cc

@@ -11,7 +11,7 @@ boost::asio::ip::tcp::socket& RpcSession::Socket()
 	return socket_;
 }
 
-void RpcSession::SendMessageHandler(int32 id, RpcHandlerPtr handler,
+void RpcSession::OnSendMessage(int32 id, RpcHandlerPtr handler,
 		const boost::system::error_code& err)
 {
 	if (err)
@@ -21,53 +21,7 @@ void RpcSession::SendMessageHandler(int32 id, RpcHandlerPtr handler,
 	}
 }
 
-void RpcSession::SendMessage(const RpcResponsePtr response, const boost::system::error_code& err)
-{
-	if (err)
-	{
-		return;
-	}
-	std::string ss = response->SerializeAsString();
-	boost::asio::async_write(socket_, boost::asio::buffer(ss),
-			boost::bind(&RpcSession::SendMessageHandler, this,
-					response->id(), boost::asio::placeholders::error));
-}
-
-void RpcSession::SendMessageSize(RpcResponsePtr response)
-{
-	int size = response->ByteSize();
-	std::string ss = boost::lexical_cast(size);
-	boost::asio::async_write(socket_, boost::asio::buffer(ss),
-			boost::bind(&RpcSession::SendMessage, this,
-					response, boost::asio::placeholders::error));
-}
-///////////////////////////
-
-void RpcSession::RecvMessegeSize()
-{
-	IntPtr size(new int);
-	boost::asio::async_read(socket_,
-			boost::asio::buffer(
-					reinterpret_cast<char*>(size.get()), sizeof(int)),
-			boost::bind(&RpcSession::RecvMessage, this, size,
-					boost::asio::placeholders::error));
-}
-
-void RpcSession::RecvMessage(IntPtr size, const boost::system::error_code& err)
-{
-	if (err)
-	{
-		LOG(ERROR) << "receive request size failed";
-		return;
-	}
-	StringPtr ss(new std::string);
-	boost::asio::async_read(socket_,
-			boost::asio::buffer(*ss, *size),
-			boost::bind(&RpcSession::RecvMessageHandler, this, ss,
-					boost::asio::placeholders::error));
-}
-
-void RpcSession::RecvMessageHandler(StringPtr ss, const boost::system::error_code& err)
+void RpcSession::OnRecvMessage(StringPtr ss, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -82,7 +36,7 @@ void RpcSession::RecvMessageHandler(StringPtr ss, const boost::system::error_cod
 	response->set_id(request->id_());
 
 	rpc_server_.RunService(shared_from_this(), request,
-			boost::bind(&RpcSession::SendMessegeSize, shared_from_this(), response));
+			boost::bind(&RPCCommunicator::SendMessegeSize, shared_from_this(), response));
 
 	// read size
 	RecvMessegeSize();
@@ -96,7 +50,7 @@ void RpcSession::Start()
 void RpcSession::Stop()
 {
 	socket_.close();
-	sessions_.erase(shared_from_this());
+	rpc_server_.RemoveSession(shared_from_this());
 }
 
 }

+ 7 - 9
Src/Egametang/Rpc/RpcSession.h

@@ -10,19 +10,17 @@ namespace Egametang {
 
 class RpcServer;
 
-class RpcSession: private boost::noncopyable, public boost::enable_shared_from_this<RpcSession>
+class RpcSession:
+		private boost::noncopyable,
+		public RPCCommunicator,
+		public boost::enable_shared_from_this<RpcSession>
 {
 private:
-	boost::asio::ip::tcp::socket socket_;
 	RpcServer& rpc_server_;
 
-	void RecvMessegeSize();
-	void RecvMessage(IntPtr size, const boost::system::error_code& err);
-	void RecvMessageHandler(StringPtr ss, const boost::system::error_code& err);
-
-	void SendMessageSize(RpcResponsePtr response);
-	void SendMessage(const RpcResponsePtr response, const boost::system::error_code& err);
-	void SendMessageHandler(int32 id, RpcHandlerPtr handler, const boost::system::error_code& err);
+	virtual void OnRecvMessage(StringPtr ss, const boost::system::error_code& err);
+	virtual void OnSendMessage(int32 id, RpcHandlerPtr handler,
+			const boost::system::error_code& err);
 
 public:
 	RpcSession(RpcServer& server);