Просмотр исходного кода

把原来的RpcHandler改成了RequestHandler,增加了ResponsHandler用于服
务器端CallMethod的回调.ResponsHandler作用是从逻辑线程回到网络线程发送
Response包

tanghai 14 лет назад
Родитель
Сommit
8b87d25bb4

+ 1 - 1
Src/Egametang/Rpc/CMakeLists.txt

@@ -5,7 +5,7 @@ PROTOBUF_GENERATE_CPP(proto_srcs proto_hdrs
 SET(RpcSrc 
 	RpcCommunicator.cc
 	RpcController.cc
-	RpcHandler.cc
+	RequestHandler.cc
 	RpcChannel.cc
 	${proto_srcs}
 )

+ 24 - 0
Src/Egametang/Rpc/RequestHandler.cc

@@ -0,0 +1,24 @@
+#include "Rpc/RequestHandler.h"
+
+namespace Egametang {
+
+RequestHandler::RequestHandler(
+		google::protobuf::Message* response, google::protobuf::Closure* done):
+		response(response), done(done)
+{
+}
+
+google::protobuf::Message *RequestHandler::Response() const
+{
+    return response;
+}
+
+void RequestHandler::Run()
+{
+	if (done)
+	{
+		done->Run();
+	}
+}
+
+} // namespace Egametang

+ 26 - 0
Src/Egametang/Rpc/RequestHandler.h

@@ -0,0 +1,26 @@
+#ifndef RPC_REQUESTHANDLER_H
+#define RPC_REQUESTHANDLER_H
+
+#include <google/protobuf/service.h>
+#include <google/protobuf/message.h>
+
+namespace Egametang {
+
+class RequestHandler
+{
+private:
+	google::protobuf::Message* response;
+	google::protobuf::Closure* done;
+public:
+	RequestHandler(
+			google::protobuf::Message* response,
+			google::protobuf::Closure* done);
+
+    google::protobuf::Message *Response() const;
+
+    void Run();
+};
+
+} // namespace Egametang
+
+#endif // RPC_REQUESTHANDLER_H

+ 10 - 16
Src/Egametang/Rpc/RpcChannel.cc

@@ -5,7 +5,7 @@
 #include <google/protobuf/descriptor.h>
 #include "Rpc/RpcCommunicator.h"
 #include "Rpc/RpcChannel.h"
-#include "Rpc/RpcHandler.h"
+#include "Rpc/RequestHandler.h"
 
 namespace Egametang {
 
@@ -39,26 +39,20 @@ void RpcChannel::OnAsyncConnect(const boost::system::error_code& err)
 
 void RpcChannel::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 {
-	RpcHandlerPtr handler = handlers_[meta->id];
-	handler->GetResponse()->ParseFromString(*message);
+	RequestHandlerPtr request_handler = request_handlers_[meta->id];
+	request_handlers_.erase(meta->id);
 
-	handlers_.erase(meta->id);
-
-	// read size
-	RpcMetaPtr recv_meta(new RpcMeta());
-	StringPtr recv_message(new std::string);
-	RecvMeta(recv_meta, recv_message);
+	request_handler->Response()->ParseFromString(*message);
 
+	// meta和message可以循环利用
+	RecvMeta(meta, message);
 	// 回调放在函数最后.如果RecvMeta()放在回调之后,
 	// 另外线程可能让io_service stop,导致RecvMeta还未跑完
 	// 网络就终止了
-	if (handler->GetDone() != NULL)
-	{
-		handler->GetDone()->Run();
-	}
+	request_handler->Run();
 }
 
-void RpcChannel::OnSendMessage()
+void RpcChannel::OnSendMessage(RpcMetaPtr meta, StringPtr message)
 {
 }
 
@@ -74,8 +68,8 @@ void RpcChannel::CallMethod(
 		google::protobuf::Message* response,
 		google::protobuf::Closure* done)
 {
-	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
-	handlers_[++id_] = handler;
+	RequestHandlerPtr request_handler(new RequestHandler(response, done));
+	request_handlers_[++id_] = request_handler;
 
 	boost::hash<std::string> string_hash;
 

+ 4 - 6
Src/Egametang/Rpc/RpcChannel.h

@@ -12,20 +12,18 @@ namespace Egametang {
 
 class RpcHandler;
 
-class RpcChannel:
-		public google::protobuf::RpcChannel,
-		public RpcCommunicator
+class RpcChannel: public google::protobuf::RpcChannel, public RpcCommunicator
 {
 private:
-	typedef boost::unordered_map<std::size_t, RpcHandlerPtr> RpcCallbackMap;
+	typedef boost::unordered_map<std::size_t, RequestHandlerPtr> RequestHandlerMap;
 
 	std::size_t id_;
-	RpcCallbackMap handlers_;
+	RequestHandlerMap request_handlers_;
 
 	void OnAsyncConnect(const boost::system::error_code& err);
 
 	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
-	virtual void OnSendMessage();
+	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);
 
 public:
 	RpcChannel(boost::asio::io_service& service, std::string host, int port);

+ 0 - 27
Src/Egametang/Rpc/RpcHandler.cc

@@ -1,27 +0,0 @@
-#include "Rpc/RpcHandler.h"
-
-namespace Egametang {
-
-RpcHandler::RpcHandler(google::protobuf::RpcController* controller,
-		google::protobuf::Message* response,
-		google::protobuf::Closure* done):
-		controller_(controller), response_(response), done_(done)
-{
-}
-
-google::protobuf::RpcController *RpcHandler::GetController() const
-{
-    return controller_;
-}
-
-google::protobuf::Closure *RpcHandler::GetDone() const
-{
-    return done_;
-}
-
-google::protobuf::Message *RpcHandler::GetResponse() const
-{
-    return response_;
-}
-
-} // namespace Egametang

+ 0 - 26
Src/Egametang/Rpc/RpcHandler.h

@@ -1,26 +0,0 @@
-#ifndef RPC_RPCHANDLER_H
-#define RPC_RPCHANDLER_H
-
-#include <google/protobuf/service.h>
-#include <google/protobuf/message.h>
-
-namespace Egametang {
-
-class RpcHandler
-{
-private:
-	google::protobuf::RpcController* controller_;
-	google::protobuf::Message* response_;
-	google::protobuf::Closure* done_;
-public:
-	RpcHandler(google::protobuf::RpcController* controller,
-			google::protobuf::Message* response,
-			google::protobuf::Closure* done);
-    google::protobuf::RpcController *GetController() const;
-    google::protobuf::Closure *GetDone() const;
-    google::protobuf::Message *GetResponse() const;
-};
-
-} // namespace Egametang
-
-#endif // RPC_RPCHANDLER_H

+ 23 - 27
Src/Egametang/Rpc/RpcServer.cc

@@ -2,11 +2,13 @@
 #include <boost/foreach.hpp>
 #include <google/protobuf/service.h>
 #include <glog/logging.h>
+#include "Base/Marcos.h"
 #include "Rpc/RpcTypedef.h"
 #include "Rpc/RpcServer.h"
 #include "Rpc/RpcSession.h"
+#include "Rpc/ResponseHandler.h"
+#include "Rpc/MethodInfo.h"
 #include "Thread/ThreadPool.h"
-#include "Base/Marcos.h"
 
 namespace Egametang {
 
@@ -26,6 +28,11 @@ RpcServer::RpcServer(boost::asio::io_service& io_service, int port):
 					new_session, boost::asio::placeholders::error));
 }
 
+boost::asio::io_service& RpcServer::IOService()
+{
+	return io_service_;
+}
+
 void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
 {
 	if (err)
@@ -41,27 +48,16 @@ void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_
 					boost::asio::placeholders::error));
 }
 
-ThreadPool& RpcServer::ThreadPool()
+void RpcServer::OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_handler)
 {
-	return thread_pool_;
-}
-
-void RpcServer::Callback(RpcSessionPtr session, CallMethodBackPtr call_method_back)
-{
-	RpcMetaPtr meta = call_method_back->meta;
-	StringPtr message(new std::string);
-	google::protobuf::Message* request = call_method_back->request;
-	google::protobuf::Message* response = call_method_back->response;
-	response->SerializeToString(message.get());
-	meta->size = message->size();
-
+	// push到网络线程
 	session->Socket().get_io_service().post(
-			boost::bind(&CallMethodBack::Run, call_method_back,
-					meta, response));
+			boost::bind(&ResponseHandler::Run, response_handler));
 }
 
 void RpcServer::Stop()
 {
+	thread_pool_.Wait();
 	acceptor_.close();
 	foreach(RpcSessionPtr session, sessions_)
 	{
@@ -71,23 +67,23 @@ void RpcServer::Stop()
 }
 
 void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,
-		StringPtr message, SendResponseHandler handler)
+		StringPtr message, MessageHandler message_handler)
 {
 	MethodInfoPtr method_info = methods_[meta->method];
-	const google::protobuf::MethodDescriptor* method = method_info->method_descriptor;
-	// 这两个Message在CallMethodBack里面delete
-	google::protobuf::Message* request = method_info->request_prototype->New();
-	google::protobuf::Message* response = method_info->response_prototype->New();
-	request->ParseFromString(*message);
 
-	CallMethodBackPtr call_method_back(new CallMethodBack(request, response, meta, handler));
+	ResponseHandlerPtr response_handler(
+			new ResponseHandler(method_info, meta->id, message_handler));
+	response_handler->Request()->ParseFromString(*message);
+
 	google::protobuf::Closure* done = google::protobuf::NewCallback(
-			shared_from_this(), &RpcServer::Callback,
-			session, call_method_back);
+			shared_from_this(), &RpcServer::OnCallMethod,
+			session, response_handler);
 
 	thread_pool_.Schedule(
-			boost::bind(&google::protobuf::Service::CallMethod, shared_from_this(),
-					method, NULL, request, response, done));
+			boost::bind(&google::protobuf::Service::CallMethod, this,
+					response_handler->Method(), NULL,
+					response_handler->Request(), response_handler->Response(),
+					done));
 }
 
 void RpcServer::RegisterService(RpcServicePtr service)

+ 8 - 50
Src/Egametang/Rpc/RpcServer.h

@@ -1,53 +1,12 @@
-#ifndef RPC_RPC_SERVER_H
-#define RPC_RPC_SERVER_H
+#ifndef RPC_RPCSERVER_H
+#define RPC_RPCSERVER_H
 #include <boost/asio.hpp>
 #include <boost/function.hpp>
 #include "Rpc/RpcTypedef.h"
 
 namespace Egametang {
 
-struct MethodInfo
-{
-	RpcServicePtr service;
-	const google::protobuf::MethodDescriptor* method_descriptor;
-	google::protobuf::Message* request_prototype;
-	google::protobuf::Message* response_prototype;
-	MethodInfo(RpcServicePtr service, const google::protobuf::MethodDescriptor* method_descriptor):
-		service(service), method_descriptor(method_descriptor)
-	{
-		request_prototype = &service->GetRequestPrototype(method_descriptor);
-		response_prototype = &service->GetResponsePrototype(method_descriptor);
-	}
-};
-
-struct CallMethodBack
-{
-	google::protobuf::Message* request;
-	google::protobuf::Message* response;
-	RpcMetaPtr meta;
-	SendResponseHandler send_response;
-
-	CallMethodBack(
-			google::protobuf::Message* request, google::protobuf::Message* response,
-			RpcMetaPtr meta, SendResponseHandler& send_response):
-				request(request), response(response),
-				meta(meta), send_response(send_response)
-	{
-	}
-
-	~CallMethodBack()
-	{
-		delete request;
-		delete response;
-	}
-
-	void Run()
-	{
-		send_response(meta, response);
-	}
-};
-
-class RpcServer: public boost::enable_shared_from_this<RpcServer>
+class RpcServer: public google::protobuf::Service, boost::enable_shared_from_this<RpcServer>
 {
 private:
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
@@ -60,16 +19,15 @@ private:
 	MethodMap methods_;
 
 	void OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err);
-	void Callback(RpcSessionPtr session, CallMethodBackPtr call_method_back);
+	void OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr call_method_back);
 
 public:
 	RpcServer(boost::asio::io_service& io_service, int port);
-	~RpcServer();
-
-	ThreadPool& ThreadPool();
+	virtual ~RpcServer();
 
+	boost::asio::io_service& IOService();
 	void RunService(RpcSessionPtr session, RpcMetaPtr meta,
-			StringPtr message, SendResponseHandler handler);
+			StringPtr message, MessageHandler handler);
 	void RegisterService(RpcServicePtr service);
 	void RemoveSession(RpcSessionPtr& session);
 	void Stop();
@@ -77,4 +35,4 @@ public:
 
 } // namespace Egametang
 
-#endif // RPC_RPC_SERVER_H
+#endif // RPC_RPCSERVER_H

+ 7 - 29
Src/Egametang/Rpc/RpcSession.cc

@@ -1,48 +1,26 @@
+#include <boost/bind.hpp>
 #include "Rpc/RpcSession.h"
 
 namespace Egametang {
 
 RpcSession::RpcSession(RpcServer& rpc_server):
-		rpc_server_(rpc_server), RpcCommunicator(rpc_server_.io_service_)
+		rpc_server_(rpc_server), RpcCommunicator(rpc_server_.IOService())
 {
 }
 
 void RpcSession::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 {
-	RpcMetaPtr send_meta(new RpcMeta());
-	StringPtr send_message(new std::string);
-	send_meta->id = meta->id;
+	rpc_server_.RunService(shared_from_this(), meta, message,
+			boost::bind(&RpcSession::SendMeta, shared_from_this(), _1, _2));
 
-	google::protobuf::Message* request;
-	request->ParseFromString(*message);
-
-	google::protobuf::Closure* done = google::protobuf::NewCallback(
-			this, &RpcServer::Callback, shared_from_this(), handler);
-	const google::protobuf::MethodDescriptor* method = NULL;
-
-
-
-	rpc_server_.thread_pool_.Schedule(
-			boost::bind(&RpcSession::SendResponse, shared_from_this(),
-					send_meta, send_message));
-
-	rpc_server_.RunService(meta, message,
-			boost::bind(&RpcSession::SendResponse, shared_from_this(), _1, _2));
-	// read size
-	RpcMetaPtr recv_meta(new RpcMeta());
-	StringPtr recv_message(new std::string);
-	RecvMeta(recv_meta, recv_message);
+	// 可以循环利用
+	RecvMeta(meta, message);
 }
 
 void RpcSession::OnSendMessage(RpcMetaPtr meta, StringPtr message)
 {
 }
 
-void RpcSession::SendResponse(RpcMetaPtr meta, StringPtr message)
-{
-	SendMeta(meta, message);
-}
-
 void RpcSession::Start()
 {
 	RpcMetaPtr meta(new RpcMeta());
@@ -52,7 +30,7 @@ void RpcSession::Start()
 
 void RpcSession::Stop()
 {
-	socket_.close();
+	RpcCommunicator::Stop();
 	rpc_server_.RemoveSession(shared_from_this());
 }
 

+ 3 - 5
Src/Egametang/Rpc/RpcSession.h

@@ -1,5 +1,5 @@
-#ifndef RPC_RPC_SESSION_H
-#define RPC_RPC_SESSION_H
+#ifndef RPC_RPCSESSION_H
+#define RPC_RPCSESSION_H
 
 #include <boost/asio.hpp>
 #include <boost/noncopyable.hpp>
@@ -18,8 +18,6 @@ class RpcSession:
 private:
 	RpcServer& rpc_server_;
 
-	void SendResponse(RpcMetaPtr meta, StringPtr message);
-
 	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
 	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);
 
@@ -32,4 +30,4 @@ public:
 
 } // namespace Egametang
 
-#endif // RPC_RPC_SESSION_H
+#endif // RPC_RPCSESSION_H

+ 16 - 17
Src/Egametang/Rpc/RpcTypedef.h

@@ -1,6 +1,7 @@
-#ifndef RPC_RPC_TYPEDEF_H
-#define RPC_RPC_TYPEDEF_H
+#ifndef RPC_RPCTYPEDEF_H
+#define RPC_RPCTYPEDEF_H
 #include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
 #include <google/protobuf/service.h>
 
 namespace Egametang {
@@ -13,23 +14,21 @@ typedef boost::shared_ptr<google::protobuf::Message> RpcMessagePtr;
 class RpcServer;
 class RpcSession;
 class RpcChannel;
-class RpcHandler;
+class RequestHandler;
 class MethodInfo;
 class RpcMeta;
-class CallMethodBack;
-
-typedef boost::shared_ptr<RpcServer> 	    RpcServerPtr;
-typedef boost::shared_ptr<RpcSession>       RpcSessionPtr;
-typedef boost::shared_ptr<RpcChannel>       RpcChannelPtr;
-typedef boost::shared_ptr<RpcHandler>       RpcHandlerPtr;
-typedef boost::shared_ptr<MethodInfo>       MethodInfoPtr;
-typedef boost::shared_ptr<RpcMeta> 	        RpcMetaPtr;
-typedef boost::shared_ptr<CallMethodBack> CallMethodBackPtr;
-
-
-typedef boost::weak_ptr<RpcServer>     RpcServerWPtr;
-
-typedef boost::function<void (std::size_t, google::protobuf::Message*)> SendResponseHandler;
+class ResponseHandler;
+
+typedef boost::shared_ptr<RpcServer> 	              RpcServerPtr;
+typedef boost::shared_ptr<RpcSession>                 RpcSessionPtr;
+typedef boost::shared_ptr<RpcChannel>                 RpcChannelPtr;
+typedef boost::shared_ptr<MethodInfo>                 MethodInfoPtr;
+typedef boost::shared_ptr<RpcMeta> 	                  RpcMetaPtr;
+typedef boost::shared_ptr<RequestHandler>             RequestHandlerPtr;
+typedef boost::shared_ptr<ResponseHandler>            ResponseHandlerPtr;
+
+typedef boost::weak_ptr<RpcServer>                    RpcServerWPtr;
+typedef boost::function<void (RpcMetaPtr, StringPtr)> MessageHandler;
 
 } // namespace Egametang