tanghai 14 лет назад
Родитель
Сommit
bed74f28b5

+ 46 - 12
Src/Egametang/Rpc/RpcServer.cc

@@ -1,6 +1,7 @@
 #include <boost/asio.hpp>
 #include <boost/foreach.hpp>
 #include <google/protobuf/service.h>
+#include <glog/logging.h>
 #include "Rpc/RpcTypedef.h"
 #include "Rpc/RpcServer.h"
 #include "Rpc/RpcSession.h"
@@ -9,8 +10,8 @@
 
 namespace Egametang {
 
-RpcServer::RpcServer(boost::asio::io_service& io_service, int port, ThreadPool& thread_pool):
-		io_service_(io_service), thread_pool_(thread_pool)
+RpcServer::RpcServer(boost::asio::io_service& io_service, int port):
+		io_service_(io_service), thread_pool_()
 {
 	boost::asio::ip::address address;
 	address.from_string("localhost");
@@ -40,10 +41,23 @@ void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_
 					boost::asio::placeholders::error));
 }
 
-void RpcServer::Callback(RpcSessionPtr session,
-		boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
+ThreadPool& RpcServer::ThreadPool()
 {
-	session->Socket().get_io_service().post(handler);
+	return thread_pool_;
+}
+
+void RpcServer::Callback(RpcSessionPtr session, CallMethodBackPtr call_method_back)
+{
+	RpcMetaPtr meta = call_method_back->meta;
+	StringPtr message(new std::string);
+	google::protobuf::Message* request = call_method_back->request;
+	google::protobuf::Message* response = call_method_back->response;
+	response->SerializeToString(message.get());
+	meta->size = message->size();
+
+	session->Socket().get_io_service().post(
+			boost::bind(&CallMethodBack::Run, call_method_back,
+					meta, response));
 }
 
 void RpcServer::Stop()
@@ -56,19 +70,39 @@ void RpcServer::Stop()
 	sessions_.clear();
 }
 
-void RpcServer::RunService(RpcSessionPtr session, RpcRequestPtr request,
-		boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
+void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,
+		StringPtr message, SendResponseHandler handler)
 {
+	MethodInfoPtr method_info = methods_[meta->method];
+	const google::protobuf::MethodDescriptor* method = method_info->method_descriptor;
+	// 这两个Message在CallMethodBack里面delete
+	google::protobuf::Message* request = method_info->request_prototype->New();
+	google::protobuf::Message* response = method_info->response_prototype->New();
+	request->ParseFromString(*message);
+
+	CallMethodBackPtr call_method_back(new CallMethodBack(request, response, meta, handler));
 	google::protobuf::Closure* done = google::protobuf::NewCallback(
-			this, &RpcServer::Callback, session, handler);
+			shared_from_this(), &RpcServer::Callback,
+			session, call_method_back);
+
 	thread_pool_.Schedule(
-			boost::bind(&google::protobuf::Service::CallMethod, &service_,
-					method, NULL, request.get(), done));
+			boost::bind(&google::protobuf::Service::CallMethod, shared_from_this(),
+					method, NULL, request, response, done));
 }
 
-void RpcServer::RegisterService(ProtobufServicePtr service)
+void RpcServer::RegisterService(RpcServicePtr service)
 {
-
+	boost::hash<std::string> string_hash;
+	const google::protobuf::ServiceDescriptor* service_descriptor = service->GetDescriptor();
+	for (int i = 0; i < service_descriptor->method_count(); ++i)
+	{
+		const google::protobuf::MethodDescriptor* method_descriptor =
+				service_descriptor->method(i);
+		std::size_t method_hash = string_hash(method_descriptor->full_name());
+		MethodInfoPtr method_info(new MethodInfo(service, method_descriptor));
+		CHECK(methods_.find(method_hash) == methods_.end());
+		methods_[method_hash] = method_info;
+	}
 }
 
 void RpcServer::RemoveSession(RpcSessionPtr& session)

+ 52 - 9
Src/Egametang/Rpc/RpcServer.h

@@ -1,33 +1,76 @@
 #ifndef RPC_RPC_SERVER_H
 #define RPC_RPC_SERVER_H
 #include <boost/asio.hpp>
+#include <boost/function.hpp>
 #include "Rpc/RpcTypedef.h"
 
 namespace Egametang {
 
+struct MethodInfo
+{
+	RpcServicePtr service;
+	const google::protobuf::MethodDescriptor* method_descriptor;
+	google::protobuf::Message* request_prototype;
+	google::protobuf::Message* response_prototype;
+	MethodInfo(RpcServicePtr service, const google::protobuf::MethodDescriptor* method_descriptor):
+		service(service), method_descriptor(method_descriptor)
+	{
+		request_prototype = &service->GetRequestPrototype(method_descriptor);
+		response_prototype = &service->GetResponsePrototype(method_descriptor);
+	}
+};
+
+struct CallMethodBack
+{
+	google::protobuf::Message* request;
+	google::protobuf::Message* response;
+	RpcMetaPtr meta;
+	SendResponseHandler send_response;
+
+	CallMethodBack(
+			google::protobuf::Message* request, google::protobuf::Message* response,
+			RpcMetaPtr meta, SendResponseHandler& send_response):
+				request(request), response(response),
+				meta(meta), send_response(send_response)
+	{
+	}
+
+	~CallMethodBack()
+	{
+		delete request;
+		delete response;
+	}
+
+	void Run()
+	{
+		send_response(meta, response);
+	}
+};
+
 class RpcServer: public boost::enable_shared_from_this<RpcServer>
 {
 private:
-	friend class RpcSession;
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
+	typedef boost::unordered_map<std::size_t, MethodInfoPtr> MethodMap;
 
-	google::protobuf::Service& service_;
 	boost::asio::io_service& io_service_;
 	boost::asio::ip::tcp::acceptor acceptor_;
-	ThreadPool& thread_pool_;
+	ThreadPool thread_pool_;
 	RpcSessionSet sessions_;
+	MethodMap methods_;
 
 	void OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err);
-	void Callback(RpcSessionPtr session,
-			boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler);
+	void Callback(RpcSessionPtr session, CallMethodBackPtr call_method_back);
 
 public:
-	RpcServer(boost::asio::io_service& io_service, int port, ThreadPool& thread_pool);
+	RpcServer(boost::asio::io_service& io_service, int port);
 	~RpcServer();
 
-	void RunService(RpcSessionPtr session, RpcRequestPtr request,
-			boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler);
-	void RegisterService(ProtobufServicePtr service);
+	ThreadPool& ThreadPool();
+
+	void RunService(RpcSessionPtr session, RpcMetaPtr meta,
+			StringPtr message, SendResponseHandler handler);
+	void RegisterService(RpcServicePtr service);
 	void RemoveSession(RpcSessionPtr& session);
 	void Stop();
 };

+ 26 - 17
Src/Egametang/Rpc/RpcSession.cc

@@ -7,38 +7,47 @@ RpcSession::RpcSession(RpcServer& rpc_server):
 {
 }
 
-void RpcSession::OnRecvMessage(StringPtr ss)
+void RpcSession::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 {
-	RpcRequestPtr request(new RpcRequest);
-	request->ParseFromString(*ss);
+	RpcMetaPtr send_meta(new RpcMeta());
+	StringPtr send_message(new std::string);
+	send_meta->id = meta->id;
 
-	int size = request->ByteSize();
-	std::string message = request->SerializeAsString();
+	google::protobuf::Message* request;
+	request->ParseFromString(*message);
 
-	RpcResponsePtr response(new RpcResponse);
-	response->set_id(request->id());
+	google::protobuf::Closure* done = google::protobuf::NewCallback(
+			this, &RpcServer::Callback, shared_from_this(), handler);
+	const google::protobuf::MethodDescriptor* method = NULL;
 
-	rpc_server_.RunService(shared_from_this(), request,
-			boost::bind(&RpcSession::SendResponse,
-					shared_from_this(), response));
+
+
+	rpc_server_.thread_pool_.Schedule(
+			boost::bind(&RpcSession::SendResponse, shared_from_this(),
+					send_meta, send_message));
+
+	rpc_server_.RunService(meta, message,
+			boost::bind(&RpcSession::SendResponse, shared_from_this(), _1, _2));
 	// read size
-	RecvSize();
+	RpcMetaPtr recv_meta(new RpcMeta());
+	StringPtr recv_message(new std::string);
+	RecvMeta(recv_meta, recv_message);
 }
 
-void RpcSession::OnSendMessage()
+void RpcSession::OnSendMessage(RpcMetaPtr meta, StringPtr message)
 {
 }
 
-void RpcSession::SendResponse(RpcResponsePtr response)
+void RpcSession::SendResponse(RpcMetaPtr meta, StringPtr message)
 {
-	int size = response->ByteSize();
-	std::string message = response->SerializeAsString();
-	SendSize(size, message);
+	SendMeta(meta, message);
 }
 
 void RpcSession::Start()
 {
-	RecvSize();
+	RpcMetaPtr meta(new RpcMeta());
+	StringPtr message(new std::string);
+	RecvMeta(meta, message);
 }
 
 void RpcSession::Stop()

+ 3 - 3
Src/Egametang/Rpc/RpcSession.h

@@ -18,10 +18,10 @@ class RpcSession:
 private:
 	RpcServer& rpc_server_;
 
-	void SendResponse(RpcResponsePtr response);
+	void SendResponse(RpcMetaPtr meta, StringPtr message);
 
-	virtual void OnRecvMessage(StringPtr ss);
-	virtual void OnSendMessage();
+	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
+	virtual void OnSendMessage(RpcMetaPtr meta, StringPtr message);
 
 public:
 	RpcSession(RpcServer& server);

+ 18 - 10
Src/Egametang/Rpc/RpcTypedef.h

@@ -6,22 +6,30 @@
 namespace Egametang {
 
 // google
-typedef boost::shared_ptr<google::protobuf::Service> ProtobufServicePtr;
-typedef boost::shared_ptr<google::protobuf::Message> ProtobufMessagePtr;
+typedef boost::shared_ptr<google::protobuf::Service> RpcServicePtr;
+typedef boost::shared_ptr<google::protobuf::Message> RpcMessagePtr;
 
 // rpc
+class RpcServer;
 class RpcSession;
-class RpcRequest;
 class RpcChannel;
 class RpcHandler;
-class RpcResponse;
+class MethodInfo;
 class RpcMeta;
-typedef boost::shared_ptr<RpcSession>  RpcSessionPtr;
-typedef boost::shared_ptr<RpcRequest>  RpcRequestPtr;
-typedef boost::shared_ptr<RpcChannel>  RpcChannelPtr;
-typedef boost::shared_ptr<RpcHandler>  RpcHandlerPtr;
-typedef boost::shared_ptr<RpcResponse> RpcResponsePtr;
-typedef boost::shared_ptr<RpcMeta> 	   RpcMetaPtr;
+class CallMethodBack;
+
+typedef boost::shared_ptr<RpcServer> 	    RpcServerPtr;
+typedef boost::shared_ptr<RpcSession>       RpcSessionPtr;
+typedef boost::shared_ptr<RpcChannel>       RpcChannelPtr;
+typedef boost::shared_ptr<RpcHandler>       RpcHandlerPtr;
+typedef boost::shared_ptr<MethodInfo>       MethodInfoPtr;
+typedef boost::shared_ptr<RpcMeta> 	        RpcMetaPtr;
+typedef boost::shared_ptr<CallMethodBack> CallMethodBackPtr;
+
+
+typedef boost::weak_ptr<RpcServer>     RpcServerWPtr;
+
+typedef boost::function<void (std::size_t, google::protobuf::Message*)> SendResponseHandler;
 
 } // namespace Egametang