Преглед изворни кода

考虑写rpc server端,网络暂时用单线程,逻辑要使用多线程

tanghai пре 15 година
родитељ
комит
406708715e

+ 45 - 0
src/Net/RpcController.cc

@@ -1 +1,46 @@
 #include "Net/RpcController.h"
+
+namespace Hainan {
+
+Hainan::RpcController::RpcController()
+{
+}
+
+Hainan::RpcController::~RpcController()
+{
+}
+
+void Hainan::RpcController::Reset()
+{
+	failed = false;
+	reason = "";
+	canceled = false;
+}
+
+bool Hainan::RpcController::Failed() const
+{
+	return failed;
+}
+
+std::string Hainan::RpcController::ErrorText() const
+{
+	return reason;
+}
+
+void Hainan::RpcController::StartCancel()
+{
+}
+
+void Hainan::RpcController::SetFailed(const string & reason)
+{
+}
+
+bool Hainan::RpcController::IsCanceled() const
+{
+}
+
+void Hainan::RpcController::NotifyOnCancel(Closure *callback)
+{
+}
+
+} // namespace Hainan

+ 8 - 0
src/Net/RpcController.h

@@ -7,14 +7,22 @@ namespace Hainan {
 
 class RpcController: public google::protobuf::RpcController
 {
+private:
+	bool failed;
+	std::string reason;
+	bool canceled;
+
 public:
 	RpcController();
 	~RpcController();
 
+	// client
 	virtual void Reset();
 	virtual bool Failed() const;
 	virtual std::string ErrorText() const;
 	virtual void StartCancel();
+
+	// server
 	virtual void SetFailed(const string& reason);
 	virtual bool IsCanceled() const;
 	virtual void NotifyOnCancel(Closure* callback);

+ 2 - 1
src/Net/RpcProtobufData.proto

@@ -17,5 +17,6 @@ message RpcResponse
 {
 	required int32 id = 1;
 	required ResponseType type = 2;
-	optional bytes response = 3;
+	optional bytes error = 3;
+	optional bytes response = 4;
 }

+ 2 - 2
src/Net/RpcServer.cc

@@ -6,7 +6,7 @@
 
 namespace Hainan {
 
-RpcServer::RpcServer(std::string& host, int port)
+RpcServer::RpcServer(std::string& host, int port): thread_pool()
 {
 	boost::asio::ip::address address;
 	address.from_string(host);
@@ -16,7 +16,7 @@ RpcServer::RpcServer(std::string& host, int port)
 			boost::asio::ip::tcp::acceptor::reuse_address(true));
 	acceptor.bind(endpoint);
 	acceptor.listen();
-	RpcSessionPtr new_session(new RpcSession(io_service, sessions));
+	RpcSessionPtr new_session(new RpcSession(sessions));
 	acceptor.async_accept(new_session->socket(),
 			boost::bind(&RpcServer::HandleAsyncAccept, this,
 					boost::asio::placeholders::error));

+ 3 - 2
src/Net/RpcServer.h

@@ -6,14 +6,15 @@ namespace Hainan {
 class RpcServer
 {
 private:
+	typedef boost::unordered_map<std::string, RpcHandlerPtr> RpcServiceMap;
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 
+	RpcServiceMap services;
 	boost::asio::io_service io_service;
 	boost::asio::ip::tcp::acceptor acceptor;
+	ThreadPool thread_pool;
 	RpcSessionSet sessions;
 
-	void HandleAsyncAccept(RpcSessionSet session,
-			const boost::system::error_code& err);
 public:
 	RpcServer();
 	~RpcServer();

+ 37 - 24
src/Net/RpcSession.cc

@@ -2,7 +2,7 @@
 
 namespace Hainan {
 
-RpcSession::RpcSession()
+RpcSession::RpcSession(RpcSessionSet& rpc_sessions): sessions(rpc_sessions)
 {
 }
 
@@ -11,43 +11,56 @@ boost::asio::ip::tcp::socket& RpcSession::Socket()
 	return socket;
 }
 
-void RpcSession::HandleAsyncRead(std::size_t bytes_transferred,
-		const boost::system::error_code& err)
+void RpcSession::RecvRequestSize()
 {
-	if (!err)
+	IntPtr size(new int);
+	boost::asio::async_read(socket,
+			boost::asio::buffer(
+					reinterpret_cast<char*>(size.get()), sizeof(int)),
+			boost::bind(&RpcChannel::RecvMessage, this, size,
+					boost::asio::placeholders::error));
+}
+
+void RpcSession::RecvMessage(IntPtr size, const boost::system::error_code& err)
+{
+	if (err)
 	{
-		bool result;
-		result = request_parser.parse();
-
-		if (result)
-		{
-
-		}
-		else
-		{
-			boost::asio::async_read(boost::asio::buffer(buffer),
-					boost::bind(&RpcSession::HandleAsyncRead, shared_from_this(),
-							boost::asio::placeholders::bytes_transferred,
-							boost::asio::placeholders::error));
-		}
+		LOG(ERROR) << "receive request size failed";
+		return;
 	}
-	else if (err != boost::asio::error::operation_aborted)
+	StringPtr ss;
+	boost::asio::async_read(socket,
+			boost::asio::buffer(*ss, *size),
+			boost::bind(&RpcSession::RecvMessageHandler, this, ss,
+					boost::asio::placeholders::error));
+}
+
+void RpcSession::RecvMessageHandler(StringPtr ss,
+		const boost::system::error_code& err)
+{
+	if (err)
 	{
-		sessions.erase(shared_from_this());
+		LOG(ERROR) << "receive request message failed";
+		return;
 	}
+
+	RpcRequest request;
+	request->ParseFromString(*ss);
+
+
+	// read size
+	RecvRequestSize();
 }
 
 void RpcSession::Start()
 {
-	boost::asio::async_read(boost::asio::buffer(buffer),
-			boost::bind(&RpcSession::HandleAsyncRead, shared_from_this(),
-					boost::asio::placeholders::bytes_transferred,
-					boost::asio::placeholders::error));
+	RecvRequestSize();
 }
 
 void RpcSession::Stop()
 {
 	socket.close();
+	sessions.erase(shared_from_this());
 }
 
 }

+ 2 - 2
src/Net/RpcSession.h

@@ -15,12 +15,12 @@ class RpcSession: private boost::noncopyable,
 private:
 	typedef boost::unordered_set<RpcSessionPtr> RpcSessionSet;
 
-	boost::array<char, 8192> buffer;
 	boost::asio::ip::tcp::socket socket;
 	RpcSessionSet& sessions;
+	ThreadPool& thread_pool;
 
 public:
-	RpcSession();
+	RpcSession(RpcSessionSet& rpc_sessions);
 	~RpcSession();
 	boost::asio::ip::tcp::socket& Socket();
 	void Start();