Przeglądaj źródła

大致完成RpcServer端编写,网络线程收到request,将包放到逻辑线程池处理,
线程池处理完成后,将response包回调到网络线程发送回RpcClient端

tanghai 15 lat temu
rodzic
commit
fd67ba9840
5 zmienionych plików z 70 dodań i 21 usunięć
  1. 13 0
      Src/Base/Typedefs.h
  2. 13 1
      Src/Net/RpcServer.cc
  3. 6 2
      Src/Net/RpcServer.h
  4. 35 10
      Src/Net/RpcSession.cc
  5. 3 8
      Src/Net/RpcSession.h

+ 13 - 0
Src/Base/Typedefs.h

@@ -2,6 +2,8 @@
 #define BASE_TYPEDEFS_H
 #include <boost/smart_ptr.hpp>
 
+namespace Hainan {
+
 typedef boost::int8_t   int8;
 typedef boost::int16_t  int16;
 typedef boost::int32_t  int32;
@@ -12,14 +14,17 @@ typedef boost::uint32_t uint32;
 typedef boost::uint64_t uint64;
 
 // smart_ptr typedef
+
 typedef boost::shared_ptr<int> IntPtr;
 typedef boost::shared_ptr<std::string> StringPtr;
 
+// boost
 namespace boost {
 class thread;
 }
 typedef boost::shared_ptr<boost::thread> ThreadPtr;
 
+// google
 namespace google {
 namespace protobuf {
 class Service;
@@ -29,4 +34,12 @@ class Message;
 typedef boost::shared_ptr<google::protobuf::Service> ProtobufServicePtr;
 typedef boost::shared_ptr<google::protobuf::Message> ProtobufMessagePtr;
 
+// Hainan
+class RpcSession;
+class RpcRequest;
+typedef boost::shared_ptr<RpcSession> RpcSessionPtr;
+typedef boost::shared_ptr<RpcRequest> RpcRequestPtr;
+
+} // namespace Hainan
+
 #endif // BASE_TYPEDEFS_H

+ 13 - 1
Src/Net/RpcServer.cc

@@ -1,5 +1,6 @@
 #include <boost/asio.hpp>
 #include <boost/foreach.hpp>
+#include <google/protobuf/service.h>
 #include "Base/Base.h"
 #include "Net/RpcServer.h"
 #include "Net/RpcSession.h"
@@ -37,9 +38,20 @@ void RpcServer::HandleAsyncAccept(RpcSessionPtr session, const boost::system::er
 					boost::asio::placeholders::error));
 }
 
-void RpcServer::RunService(boost::asio::ip::tcp::socket& socket, RpcRequestPtr request)
+void RpcServer::Callback(RpcSessionPtr session,
+		boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
 {
+	session->socket.get_io_service().post(handler);
+}
 
+void RpcServer::RunService(RpcSessionPtr session, RpcRequestPtr request,
+		boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
+{
+	google::protobuf::Closure* done = google::protobuf::NewCallback(
+			&RpcServer::Callback, shared_from_this(), session, handler);
+	thread_pool.PushTask(
+			boost::bind(&google::protobuf::Service::CallMethod, &service,
+					method, NULL, request.get(), done));
 }
 
 void RpcServer::Start()

+ 6 - 2
Src/Net/RpcServer.h

@@ -1,12 +1,12 @@
 #ifndef NET_RPC_SERVER_H
 #define NET_RPC_SERVER_H
+#include "base/base.h"
 
 namespace Hainan {
 
 class RpcServer: public boost::enable_shared_from_this<RpcServer>
 {
 private:
-	friend class RpcSession;
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 	google::protobuf::Service& service;
 	boost::asio::io_service io_service;
@@ -14,13 +14,17 @@ private:
 	ThreadPool thread_pool;
 	RpcSessionSet sessions;
 
-	void RunService(boost::asio::ip::tcp::socket& socket, RpcRequestPtr request);
+	void RpcServer::Callback(RpcSessionPtr session,
+			boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler);
 
 public:
 	RpcServer(google::protobuf::Service& pservice, int port);
 	~RpcServer();
 	void Start();
 	void Stop();
+
+	void RunService(RpcSessionPtr session, RpcRequestPtr request,
+			boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler);
 };
 
 } // namespace Hainan

+ 35 - 10
Src/Net/RpcSession.cc

@@ -11,10 +11,38 @@ boost::asio::ip::tcp::socket& RpcSession::Socket()
 	return socket;
 }
 
-void RpcSession::SendMessegeSize()
+void RpcSession::SendMessageHandler(int32 id, RpcHandlerPtr handler,
+		const boost::system::error_code& err)
 {
+	if (err)
+	{
+		LOG(ERROR) << "SendMessage error:";
+		return;
+	}
+	handlers[id] = handler;
+}
 
+void RpcSession::SendMessage(const RpcResponsePtr response, const boost::system::error_code& err)
+{
+	if (err)
+	{
+		return;
+	}
+	std::string ss = response->SerializeAsString();
+	boost::asio::async_write(socket, boost::asio::buffer(ss),
+			boost::bind(&RpcSession::SendMessageHandler, this,
+					response->id(), boost::asio::placeholders::error));
+}
+
+void RpcSession::SendMessageSize(RpcResponsePtr response)
+{
+	int size = response->ByteSize();
+	std::string ss = boost::lexical_cast(size);
+	boost::asio::async_write(socket, boost::asio::buffer(ss),
+			boost::bind(&RpcSession::SendMessage, this,
+					response, boost::asio::placeholders::error));
 }
+///////////////////////////
 
 void RpcSession::RecvMessegeSize()
 {
@@ -40,11 +68,6 @@ void RpcSession::RecvMessage(IntPtr size, const boost::system::error_code& err)
 					boost::asio::placeholders::error));
 }
 
-ThreadPool& RpcSession::GetThreadPool()
-{
-	return rpc_server.thread_pool;
-}
-
 void RpcSession::RecvMessageHandler(StringPtr ss, const boost::system::error_code& err)
 {
 	if (err)
@@ -53,12 +76,14 @@ void RpcSession::RecvMessageHandler(StringPtr ss, const boost::system::error_cod
 		return;
 	}
 
-	RpcRequestPtr request;
+	RpcRequestPtr request(new RpcRequest);
 	request->ParseFromString(*ss);
 
-	GetThreadPool().PushTask(
-			boost::bind(&RpcServer::RunService, rpc_server.shared_from_this(),
-					shared_from_this(), request));
+	RpcResponsePtr response(new RpcResponse);
+	response->set_id(request->id());
+
+	rpc_server.RunService(shared_from_this(), request,
+			boost::bind(&RpcSession::SendMessegeSize, shared_from_this(), response));
 
 	// read size
 	RecvMessegeSize();

+ 3 - 8
Src/Net/RpcSession.h

@@ -6,20 +6,17 @@
 #include <boost/noncopyable.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
-#include "Net/RpcServer.h"
 
 namespace Hainan {
 
+class RpcServer;
+
 class RpcSession: private boost::noncopyable, public boost::enable_shared_from_this<RpcSession>
 {
 private:
-	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
-
 	boost::asio::ip::tcp::socket socket;
 	RpcServer& rpc_server;
 
-	ThreadPool& GetThreadPool();
-
 public:
 	RpcSession(RpcServer& server);
 	~RpcSession();
@@ -28,8 +25,6 @@ public:
 	void Stop();
 };
 
-typedef boost::shared_ptr<RpcSession> RpcSessionPtr;
-
-}
+} // namespace Hainan
 
 #endif // NET_RPC_SESSION_H