Explorar el Código

rpc server端,大致代码

tanghai hace 15 años
padre
commit
2e5c3cd778
Se han modificado 4 ficheros con 50 adiciones y 71 borrados
  1. 23 40
      src/Net/RpcServer.cc
  2. 8 2
      src/Net/RpcServer.h
  3. 12 24
      src/Net/RpcSession.cc
  4. 7 5
      src/Net/RpcSession.h

+ 23 - 40
src/Net/RpcServer.cc

@@ -1,72 +1,55 @@
 #include <boost/asio.hpp>
+#include <boost/foreach.hpp>
+#include "Base/Base.h"
 #include "Net/RpcServer.h"
 #include "Net/RpcSession.h"
 
 namespace Hainan {
 
-typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
-
-struct RpcServer::RpcServerFeild
-{
-	boost::asio::io_service io_service;
-	boost::asio::ip::tcp::acceptor acceptor;
-	RpcSessionSet sessions;
-
-	RpcServerFeild();
-	~RpcServerFeild();
-};
-
-RpcServer::RpcServerFeild::RpcServerFeild():
-		io_service(), acceptor(io_service)
-{
-}
-
-RpcServer::RpcServerFeild::~RpcServerFeild()
-{
-}
-
 RpcServer::RpcServer(std::string& host, int port):
-		rpc_server(new RpcServer::RpcServerFeild())
+		rpc_server(new RpcServer::RpcServerInternal())
 {
 	boost::asio::ip::address address;
 	address.from_string(host);
 	boost::asio::ip::tcp::endpoint endpoint(address, port);
-	rpc_server->acceptor.open(endpoint.protocol());
-	rpc_server->acceptor.set_option(
+	acceptor.open(endpoint.protocol());
+	acceptor.set_option(
 			boost::asio::ip::tcp::acceptor::reuse_address(true));
-	rpc_server->acceptor.bind(endpoint);
-	rpc_server->acceptor.listen();
-	RpcSessionPtr session(new RpcSession(
-			rpc_server->io_service,
-			rpc_server->sessions));
-	rpc_server->acceptor.async_accept(session->socket(),
-			boost::bind(&RpcServer::HandleAsyncAccept, this,
-					boost::asio::placeholders::error));
+	acceptor.bind(endpoint);
+	acceptor.listen();
 }
 
-void RpcServer::HandleAsyncAccept(RpcSessionSet session,
-		const boost::system::error_code& err)
+void RpcServer::HandleAsyncAccept(
+		RpcSessionPtr session, const boost::system::error_code& err)
 {
 	if (err)
 	{
 		return;
 	}
 	session->Start();
-	rpc_server->sessions.insert(session);
-	RpcSessionPtr session(new RpcSession(
-			rpc_server->io_service,
-			rpc_server->sessions));
-	rpc_server->acceptor.async_accept(session->socket(),
+	sessions.insert(session);
+	RpcSessionPtr new_session(new RpcSession(io_service, sessions));
+	acceptor.async_accept(new_session->socket(),
 			boost::bind(&RpcServer::HandleAsyncAccept, this,
 					boost::asio::placeholders::error));
 }
 
-void RpcServer::AsyncConnect()
+void RpcServer::Start()
 {
+	RpcSessionPtr new_session(new RpcSession(io_service, sessions));
+	acceptor.async_accept(new_session->socket(),
+			boost::bind(&RpcServer::HandleAsyncAccept, this,
+					boost::asio::placeholders::error));
 }
 
 void RpcServer::Stop()
 {
+	acceptor.close();
+	foreach(RpcSessionPtr session, rpc_server->sessions)
+	{
+		session->stop();
+	}
+	sessions.clear();
 }
 
 } // namespace Hainan

+ 8 - 2
src/Net/RpcServer.h

@@ -6,8 +6,14 @@ namespace Hainan {
 class RpcServer
 {
 private:
-	struct RpcServerFeild;
-	boost::scoped_ptr<RpcServerFeild> rpc_server;
+	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
+
+	boost::asio::io_service io_service;
+	boost::asio::ip::tcp::acceptor acceptor;
+	RpcSessionSet sessions;
+
+	void HandleAsyncAccept(RpcSessionSet session,
+			const boost::system::error_code& err);
 public:
 	RpcServer();
 	~RpcServer();

+ 12 - 24
src/Net/RpcSession.cc

@@ -2,33 +2,13 @@
 
 namespace Hainan {
 
-typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
-
-struct RpcSession::RpcSessionFeild
-{
-	boost::array<char, 8192> buffer;
-	boost::asio::ip::tcp::socket socket;
-	RpcSessionSet& sessions;
-
-	RpcSessionFeild();
-	~RpcSessionFeild();
-};
-
-RpcSession::RpcSession(): rpc_session(new RpcSessionFeild())
+RpcSession::RpcSession()
 {
 }
 
 boost::asio::ip::tcp::socket& RpcSession::Socket()
 {
-	return rpc_session->socket;
-}
-
-void RpcSession::Start()
-{
-	boost::asio::async_read(boost::asio::buffer(rpc_session->buffer),
-			boost::bind(&RpcSession::HandleAsyncRead, shared_from_this(),
-					boost::asio::placeholders::bytes_transferred,
-					boost::asio::placeholders::error));
+	return socket;
 }
 
 void RpcSession::HandleAsyncRead(std::size_t bytes_transferred,
@@ -53,13 +33,21 @@ void RpcSession::HandleAsyncRead(std::size_t bytes_transferred,
 	}
 	else if (err != boost::asio::error::operation_aborted)
 	{
-		rpc_session->sessions.erase(shared_from_this());
+		sessions.erase(shared_from_this());
 	}
 }
 
+void RpcSession::Start()
+{
+	boost::asio::async_read(boost::asio::buffer(buffer),
+			boost::bind(&RpcSession::HandleAsyncRead, shared_from_this(),
+					boost::asio::placeholders::bytes_transferred,
+					boost::asio::placeholders::error));
+}
+
 void RpcSession::Stop()
 {
-	rpc_session->socket.close();
+	socket.close();
 }
 
 }

+ 7 - 5
src/Net/RpcSession.h

@@ -9,13 +9,15 @@
 
 namespace Hainan {
 
-class RpcSession:
-		public boost::enable_shared_from_this<RpcSession>,
-		private boost::noncopyable
+class RpcSession: private boost::noncopyable,
+		public boost::enable_shared_from_this<RpcSession>
 {
 private:
-	struct RpcSessionFeild;
-	boost::scoped_ptr<RpcSessionFeild> rpc_session;
+	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
+
+	boost::array<char, 8192> buffer;
+	boost::asio::ip::tcp::socket socket;
+	RpcSessionSet& sessions;
 
 public:
 	RpcSession();