Просмотр исходного кода

大致完成了client端的rpc流程

tanghai 15 лет назад
Родитель
Сommit
0ff015db6f
4 измененных файлов с 49 добавлено и 38 удалено
  1. 3 0
      src/Base/Typedefs.h
  2. 41 30
      src/Net/RpcChannel.cc
  3. 4 7
      src/Thread/ThreadPool.cc
  4. 1 1
      src/Thread/ThreadPool.h

+ 3 - 0
src/Base/Typedefs.h

@@ -12,7 +12,10 @@ typedef boost::uint32_t uint32;
 typedef boost::uint64_t uint64;
 
 // smart_ptr typedef
+typedef boost::shared_ptr<int> IntPtr;
+typedef boost::shared_ptr<std::string> StringPtr;
 typedef boost::shared_ptr<boost::thread> ThreadPtr;
 typedef boost::shared_ptr<google::protobuf::Service> ProtobufServicePtr;
+typedef boost::shared_ptr<google::protobuf::Message> ProtobufMessagePtr;
 
 #endif // BASE_TYPEDEFS_H

+ 41 - 30
src/Net/RpcChannel.cc

@@ -9,41 +9,51 @@ namespace Hainan {
 RpcChannel::RpcChannel(std::string& host, int port):
 		id(0), communicator(host, port)
 {
-	RecvResponse();
+	// another thread?
+	RecvMessage();
 }
 
-void RpcChannel::RecvResponseHandler(IOStreamPtr input, RpcRequestPtr request,
-		const boost::system::error_code& err)
+void RpcChannel::RecvResponseHandler(StringPtr ss,
+		const boost::asio::error_code& err)
 {
 	if (err)
 	{
-		LOG(FATAL) << "receive response failed";
+		LOG(ERROR) << "receive response failed";
+		return;
 	}
-	int32 id = request->id();
-	RpcHandlerPtr handler = handlers[id];
-	handler->GetResponse()->ParsePartialFromIstream(input.get());
-	RecvResponse();
-}
 
-void RpcChannel::RecvResponse()
-{
-	std::stringstream ss;
-	communicator.AsyncRead(boost::asio::buffer(ss),
-			boost::bind(&RpcChannel::RecvResponseHandler, this,
-					boost::asio::placeholders::error));
+	RpcResponse response;
+	Response->ParseFromString(*ss);
+	RpcHandlerPtr handler = handlers[response.id()];
+	handler->GetResponse()->ParseFromString(response.response());
+	handlers.erase(response.id());
+
+	RecvMessage();
 }
 
-void RpcChannel::SendRequestHandler(int32 id, RpcHandlerPtr handler,
-		const boost::asio::error_code err)
+void RpcChannel::RecvSizeHandler(IntPtr size,
+		const boost::asio::error_code& err)
 {
 	if (err)
 	{
-		handler->GetController()->SetFailed("failed");
-	}
-	else
-	{
-		handlers[id] = handler;
+		LOG(ERROR) << "receive response size failed";
+		return;
 	}
+	StringPtr ss;
+	boost::asio::async_read(socket,
+			boost::asio::buffer(*ss, *size),
+			boost::bind(&RpcChannel::RecvResponseHandler, this, ss,
+					boost::asio::placeholders::error));
+}
+
+void RpcChannel::RecvMessage()
+{
+	IntPtr size(new int);
+	boost::asio::async_read(socket,
+			boost::asio::buffer(
+					reinterpret_cast<char*>(size.get()), sizeof(int)),
+			boost::bind(&RpcChannel::RecvSizeHandler, this, size,
+					boost::asio::placeholders::error));
 }
 
 void RpcChannel::SendRequestHandler(int32 id, RpcHandlerPtr handler,
@@ -54,28 +64,29 @@ void RpcChannel::SendRequestHandler(int32 id, RpcHandlerPtr handler,
 		LOG(ERROR) << "SendRequestHandler error:" << e.what();
 		return;
 	}
+	handlers[id] = handler;
 }
 
-void RpcChannel::SendSizeHandler(int32 id, RpcHandlerPtr handler,
-		const boost::asio::error_code& err)
+void RpcChannel::SendSizeHandler(const RpcRequestPtr request,
+		RpcHandlerPtr handler, const boost::asio::error_code& err)
 {
 	if (err)
 	{
 		LOG(ERROR) << "SendSizeHandler error:" << e.what();
 		return;
 	}
-	string ss = request.SerializeAsString();
-	boost::asio::async_write(boost::asio::buffer(ss),
-			boost::bind(&RpcChannel::SendRequestHandler, this, request.id(),
+	std::string ss = request->SerializeAsString();
+	boost::asio::async_write(socket, boost::asio::buffer(ss),
+			boost::bind(&RpcChannel::SendRequestHandler, this, request->id(),
 					handler, boost::asio::placeholders::error));
 }
 
 void RpcChannel::SendMessage(const RpcRequestPtr request, RpcHandlerPtr handler)
 {
 	int size = request->ByteSize();
-	string ss = boost::lexical_cast(size);
-	boost::asio::async_write(boost::asio::buffer(ss),
-			boost::bind(&RpcChannel::SendSizeHandler, this, request->id(),
+	std::string ss = boost::lexical_cast(size);
+	boost::asio::async_write(socket, boost::asio::buffer(ss),
+			boost::bind(&RpcChannel::SendSizeHandler, this, request,
 					handler, boost::asio::placeholders::error));
 }
 

+ 4 - 7
src/Thread/ThreadPool.cc

@@ -1,15 +1,13 @@
+#include <boost/detail/atomic_count.hpp>
 #include <glog/logging.h>
 #include "Thread/ThreadPool.h"
 
 namespace Hainan {
 
 ThreadPool::ThreadPool(int num) :
-	thread_num(num), running(false), work_num(0)
+	thread_num(num? num : boost::thread::hardware_concurrency()),
+	work_num(thread_num), running(false), work_num(0)
 {
-	if (thread_num == 0)
-	{
-		thread_num = boost::thread::hardware_concurrency();
-	}
 }
 
 ThreadPool::~ThreadPool()
@@ -26,7 +24,6 @@ void ThreadPool::Start()
 		threads.push_back(t);
 		t->detach();
 	}
-	work_num = thread_num;
 }
 
 void ThreadPool::Stop()
@@ -76,7 +73,7 @@ void ThreadPool::Runner()
 			task();
 		}
 	}
-	if (boost::detail::atomic_increment(&work_num) == 0)
+	if (--work_num == 0)
 	{
 		VLOG(3) << "work_num = " << work_num;
 		done.notify_one();

+ 1 - 1
src/Thread/ThreadPool.h

@@ -13,7 +13,7 @@ class ThreadPool: private boost::noncopyable
 {
 private:
 	int thread_num;
-	volatile int work_num;
+	boost::detail::atomic_count work_num;
 	volatile bool running;
 	boost::mutex mutex;
 	boost::condition_variable cond;