فهرست منبع

增加了一些RpcServer代码

tanghai 15 سال پیش
والد
کامیت
3ee18f2013
8فایلهای تغییر یافته به همراه40 افزوده شده و 44 حذف شده
  1. 4 0
      Src/Net/RpcChannel.cc
  2. 1 1
      Src/Net/RpcChannel.h
  3. 8 5
      Src/Net/RpcServer.cc
  4. 5 2
      Src/Net/RpcServer.h
  5. 0 0
      Src/Net/RpcService.cc
  6. 0 25
      Src/Net/RpcService.h
  7. 16 4
      Src/Net/RpcSession.cc
  8. 6 7
      Src/Net/RpcSession.h

+ 4 - 0
Src/Net/RpcChannel.cc

@@ -66,6 +66,10 @@ void RpcChannel::RecvMessageHandler(
 	RpcHandlerPtr handler = handlers[response.id()];
 	handler->GetResponse()->ParseFromString(response.response());
 
+	if (handler->done != NULL)
+	{
+		handler->done->Run();
+	}
 
 	handlers.erase(response.id());
 

+ 1 - 1
Src/Net/RpcChannel.h

@@ -35,7 +35,7 @@ private:
 			const boost::system::error_code& err);
 
 public:
-	RpcChannel(std::string& host, int port);
+	RpcChannel(boost::asio::io_service& service, std::string& host, int port);
 	~RpcChannel();
 	virtual void CallMethod(
 			const google::protobuf::MethodDescriptor* method,

+ 8 - 5
Src/Net/RpcServer.cc

@@ -8,14 +8,13 @@
 namespace Hainan {
 
 RpcServer::RpcServer(google::protobuf::Service& pservice, int port):
-		service(pservice), io_service()
+		service(pservice), io_service(), thread_pool()
 {
 	boost::asio::ip::address address;
 	address.from_string("localhost");
 	boost::asio::ip::tcp::endpoint endpoint(address, port);
 	acceptor.open(endpoint.protocol());
-	acceptor.set_option(
-			boost::asio::ip::tcp::acceptor::reuse_address(true));
+	acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
 	acceptor.bind(endpoint);
 	acceptor.listen();
 	RpcSessionPtr new_session(new RpcSession(sessions));
@@ -24,8 +23,7 @@ RpcServer::RpcServer(google::protobuf::Service& pservice, int port):
 					boost::asio::placeholders::error));
 }
 
-void RpcServer::HandleAsyncAccept(
-		RpcSessionPtr session, const boost::system::error_code& err)
+void RpcServer::HandleAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -39,6 +37,11 @@ void RpcServer::HandleAsyncAccept(
 					boost::asio::placeholders::error));
 }
 
+void RpcServer::RunService(boost::asio::ip::tcp::socket& socket, RpcRequestPtr request)
+{
+
+}
+
 void RpcServer::Start()
 {
 	io_service.run();

+ 5 - 2
Src/Net/RpcServer.h

@@ -3,16 +3,19 @@
 
 namespace Hainan {
 
-class RpcServer
+class RpcServer: public boost::enable_shared_from_this<RpcServer>
 {
 private:
+	friend class RpcSession;
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
-
 	google::protobuf::Service& service;
 	boost::asio::io_service io_service;
 	boost::asio::ip::tcp::acceptor acceptor;
+	ThreadPool thread_pool;
 	RpcSessionSet sessions;
 
+	void RunService(boost::asio::ip::tcp::socket& socket, RpcRequestPtr request);
+
 public:
 	RpcServer(google::protobuf::Service& pservice, int port);
 	~RpcServer();

+ 0 - 0
Src/Net/RpcService.cc


+ 0 - 25
Src/Net/RpcService.h

@@ -1,25 +0,0 @@
-#ifndef NET_RPC_SERVICE_H
-#define NET_RPC_SERVICE_H
-
-#include <google/protobuf/service.h>
-
-namespace Hainan {
-
-class RpcService
-{
-private:
-	typedef boost::unordered_map<std::string, RpcHandlerPtr> RpcServiceMap;
-
-	RpcServiceMap services;
-
-
-public:
-	RpcServer(google::protobuf::Service* service);
-	~RpcServer();
-	void Start();
-	void Stop();
-};
-
-} // namespace Hainan
-
-#endif // NET_RPC_SERVICE_H

+ 16 - 4
Src/Net/RpcSession.cc

@@ -11,6 +11,11 @@ boost::asio::ip::tcp::socket& RpcSession::Socket()
 	return socket;
 }
 
+void RpcSession::SendMessegeSize()
+{
+
+}
+
 void RpcSession::RecvMessegeSize()
 {
 	IntPtr size(new int);
@@ -28,15 +33,19 @@ void RpcSession::RecvMessage(IntPtr size, const boost::system::error_code& err)
 		LOG(ERROR) << "receive request size failed";
 		return;
 	}
-	StringPtr ss;
+	StringPtr ss(new std::string);
 	boost::asio::async_read(socket,
 			boost::asio::buffer(*ss, *size),
 			boost::bind(&RpcSession::RecvMessageHandler, this, ss,
 					boost::asio::placeholders::error));
 }
 
-void RpcSession::RecvMessageHandler(StringPtr ss,
-		const boost::system::error_code& err)
+ThreadPool& RpcSession::GetThreadPool()
+{
+	return rpc_server.thread_pool;
+}
+
+void RpcSession::RecvMessageHandler(StringPtr ss, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -44,9 +53,12 @@ void RpcSession::RecvMessageHandler(StringPtr ss,
 		return;
 	}
 
-	RpcRequest request;
+	RpcRequestPtr request;
 	request->ParseFromString(*ss);
 
+	GetThreadPool().PushTask(
+			boost::bind(&RpcServer::RunService, rpc_server.shared_from_this(),
+					shared_from_this(), request));
 
 	// read size
 	RecvMessegeSize();

+ 6 - 7
Src/Net/RpcSession.h

@@ -1,28 +1,27 @@
 #ifndef NET_RPC_SESSION_H
 #define NET_RPC_SESSION_H
 
-#include <list>
 #include <boost/asio.hpp>
 #include <boost/array.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
+#include "Net/RpcServer.h"
 
 namespace Hainan {
 
-class RpcSession: private boost::noncopyable,
-		public boost::enable_shared_from_this<RpcSession>
+class RpcSession: private boost::noncopyable, public boost::enable_shared_from_this<RpcSession>
 {
 private:
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 
 	boost::asio::ip::tcp::socket socket;
-	std::list<RpcResponsePtr> responses;
-	RpcSessionSet& sessions;
-	ThreadPool& thread_pool;
+	RpcServer& rpc_server;
+
+	ThreadPool& GetThreadPool();
 
 public:
-	RpcSession(RpcSessionSet& rpc_sessions, ThreadPool& pool);
+	RpcSession(RpcServer& server);
 	~RpcSession();
 	boost::asio::ip::tcp::socket& Socket();
 	void Start();