Selaa lähdekoodia

一些修改,准备写test

tanghai 15 vuotta sitten
vanhempi
commit
5e3a2e0f74
5 muutettua tiedostoa jossa 28 lisäystä ja 19 poistoa
  1. 2 0
      Src/Base/Typedefs.h
  2. 4 4
      Src/Net/RpcServer.cc
  3. 2 2
      Src/Net/RpcServer.h
  4. 10 11
      Src/Net/RpcSession.cc
  5. 10 2
      Src/Net/RpcSession.h

+ 2 - 0
Src/Base/Typedefs.h

@@ -33,10 +33,12 @@ class RpcSession;
 class RpcRequest;
 class RpcChannel;
 class ThreadPool;
+class ThreadPoolIf;
 typedef boost::shared_ptr<RpcSession> RpcSessionPtr;
 typedef boost::shared_ptr<RpcRequest> RpcRequestPtr;
 typedef boost::shared_ptr<RpcChannel> RpcChannelPtr;
 typedef boost::shared_ptr<ThreadPool> ThreadPoolPtr;
+typedef boost::shared_ptr<ThreadPoolIf> ThreadPoolIfPtr;
 
 } // namespace Hainan
 

+ 4 - 4
Src/Net/RpcServer.cc

@@ -8,8 +8,8 @@
 
 namespace Hainan {
 
-RpcServer::RpcServer(boost::asio::io_service& io_service, int port):
-		io_service_(io_service), thread_pool_()
+RpcServer::RpcServer(boost::asio::io_service& io_service, int port, ThreadPool& thread_pool):
+		io_service_(io_service), thread_pool_(thread_pool)
 {
 	boost::asio::ip::address address;
 	address.from_string("localhost");
@@ -32,7 +32,7 @@ void RpcServer::HandleAsyncAccept(RpcSessionPtr session, const boost::system::er
 	}
 	session->Start();
 	sessions_.insert(session);
-	RpcSessionPtr new_session(new RpcSession(io_service_, sessions_));
+	RpcSessionPtr new_session(new RpcSession(*this));
 	acceptor_.async_accept(new_session->socket(),
 			boost::bind(&RpcServer::HandleAsyncAccept, this,
 					boost::asio::placeholders::error));
@@ -47,7 +47,7 @@ void RpcServer::Callback(RpcSessionPtr session,
 void RpcServer::Stop()
 {
 	acceptor_.close();
-	foreach(RpcSessionPtr session, rpc_server->sessions_)
+	foreach(RpcSessionPtr session, sessions_)
 	{
 		session->Stop();
 	}

+ 2 - 2
Src/Net/RpcServer.h

@@ -13,7 +13,7 @@ private:
 	google::protobuf::Service& service_;
 	boost::asio::io_service& io_service_;
 	boost::asio::ip::tcp::acceptor acceptor_;
-	ThreadPool thread_pool_;
+	ThreadPoolIf& thread_pool_;
 	RpcSessionSet sessions_;
 
 	void HandleAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err);
@@ -21,7 +21,7 @@ private:
 			boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler);
 
 public:
-	RpcServer(boost::asio::io_service& io_service, int port);
+	RpcServer(boost::asio::io_service& io_service, int port, ThreadPool& thread_pool);
 	~RpcServer();
 	void Start();
 	void Stop();

+ 10 - 11
Src/Net/RpcSession.cc

@@ -2,13 +2,13 @@
 
 namespace Hainan {
 
-RpcSession::RpcSession(RpcSessionSet& rpc_sessions): sessions_(rpc_sessions)
+RpcSession::RpcSession(RpcServer& rpc_server): rpc_server_(rpc_server)
 {
 }
 
 boost::asio::ip::tcp::socket& RpcSession::Socket()
 {
-	return socket;
+	return socket_;
 }
 
 void RpcSession::SendMessageHandler(int32 id, RpcHandlerPtr handler,
@@ -19,7 +19,6 @@ void RpcSession::SendMessageHandler(int32 id, RpcHandlerPtr handler,
 		LOG(ERROR) << "SendMessage error:";
 		return;
 	}
-	handlers_[id] = handler;
 }
 
 void RpcSession::SendMessage(const RpcResponsePtr response, const boost::system::error_code& err)
@@ -29,16 +28,16 @@ void RpcSession::SendMessage(const RpcResponsePtr response, const boost::system:
 		return;
 	}
 	std::string ss = response->SerializeAsString();
-	boost::asio::async_write(socket, boost::asio::buffer(ss),
+	boost::asio::async_write(socket_, boost::asio::buffer(ss),
 			boost::bind(&RpcSession::SendMessageHandler, this,
-					response->id_(), boost::asio::placeholders::error));
+					response->id(), boost::asio::placeholders::error));
 }
 
 void RpcSession::SendMessageSize(RpcResponsePtr response)
 {
 	int size = response->ByteSize();
 	std::string ss = boost::lexical_cast(size);
-	boost::asio::async_write(socket, boost::asio::buffer(ss),
+	boost::asio::async_write(socket_, boost::asio::buffer(ss),
 			boost::bind(&RpcSession::SendMessage, this,
 					response, boost::asio::placeholders::error));
 }
@@ -47,10 +46,10 @@ void RpcSession::SendMessageSize(RpcResponsePtr response)
 void RpcSession::RecvMessegeSize()
 {
 	IntPtr size(new int);
-	boost::asio::async_read(socket,
+	boost::asio::async_read(socket_,
 			boost::asio::buffer(
 					reinterpret_cast<char*>(size.get()), sizeof(int)),
-			boost::bind(&RpcChannel::RecvMessage, this, size,
+			boost::bind(&RpcSession::RecvMessage, this, size,
 					boost::asio::placeholders::error));
 }
 
@@ -62,7 +61,7 @@ void RpcSession::RecvMessage(IntPtr size, const boost::system::error_code& err)
 		return;
 	}
 	StringPtr ss(new std::string);
-	boost::asio::async_read(socket,
+	boost::asio::async_read(socket_,
 			boost::asio::buffer(*ss, *size),
 			boost::bind(&RpcSession::RecvMessageHandler, this, ss,
 					boost::asio::placeholders::error));
@@ -82,7 +81,7 @@ void RpcSession::RecvMessageHandler(StringPtr ss, const boost::system::error_cod
 	RpcResponsePtr response(new RpcResponse);
 	response->set_id(request->id_());
 
-	rpc_server.RunService(shared_from_this(), request,
+	rpc_server_.RunService(shared_from_this(), request,
 			boost::bind(&RpcSession::SendMessegeSize, shared_from_this(), response));
 
 	// read size
@@ -96,7 +95,7 @@ void RpcSession::Start()
 
 void RpcSession::Stop()
 {
-	socket.close();
+	socket_.close();
 	sessions_.erase(shared_from_this());
 }
 

+ 10 - 2
Src/Net/RpcSession.h

@@ -14,8 +14,16 @@ class RpcServer;
 class RpcSession: private boost::noncopyable, public boost::enable_shared_from_this<RpcSession>
 {
 private:
-	boost::asio::ip::tcp::socket socket;
-	RpcServer& rpc_server;
+	boost::asio::ip::tcp::socket socket_;
+	RpcServer& rpc_server_;
+
+	void RecvMessegeSize();
+	void RecvMessage(IntPtr size, const boost::system::error_code& err);
+	void RecvMessageHandler(StringPtr ss, const boost::system::error_code& err);
+
+	void SendMessageSize(RpcResponsePtr response);
+	void SendMessage(const RpcResponsePtr response, const boost::system::error_code& err);
+	void SendMessageHandler(int32 id, RpcHandlerPtr handler, const boost::system::error_code& err);
 
 public:
 	RpcSession(RpcServer& server);