Просмотр исходного кода

考虑怎么写send和receive protobuf message

tanghai 15 лет назад
Родитель
Сommit
4dab5bd677
5 измененных файлов с 75 добавлено и 19 удалено
  1. 60 17
      src/Net/RpcChannel.cc
  2. 4 0
      src/Net/RpcChannel.h
  3. 1 1
      src/Net/RpcCommunicator.cc
  4. 1 1
      src/Net/RpcCommunicator.h
  5. 9 0
      src/Net/RpcController.h

+ 60 - 17
src/Net/RpcChannel.cc

@@ -7,13 +7,34 @@
 namespace Hainan {
 
 RpcChannel::RpcChannel(std::string& host, int port):
-		id(0)
+		id(0), communicator(host, port)
 {
-	// socket.async_connect(endpoint, );
+	RecvResponse();
 }
 
-void RpcCommunicator::SendRequestHandler(int32 id, RpcHandlerPtr handler,
+void RpcChannel::RecvResponseHandler(IOStreamPtr input, RpcRequestPtr request,
 		const boost::system::error_code& err)
+{
+	if (err)
+	{
+		LOG(FATAL) << "receive response failed";
+	}
+	int32 id = request->id();
+	RpcHandlerPtr handler = handlers[id];
+	handler->GetResponse()->ParsePartialFromIstream(input.get());
+	RecvResponse();
+}
+
+void RpcChannel::RecvResponse()
+{
+	std::stringstream ss;
+	communicator.AsyncRead(boost::asio::buffer(ss),
+			boost::bind(&RpcChannel::RecvResponseHandler, this,
+					boost::asio::placeholders::error));
+}
+
+void RpcChannel::SendRequestHandler(int32 id, RpcHandlerPtr handler,
+		const boost::asio::error_code err)
 {
 	if (err)
 	{
@@ -25,30 +46,52 @@ void RpcCommunicator::SendRequestHandler(int32 id, RpcHandlerPtr handler,
 	}
 }
 
-void RpcChannel::SendRequest(const RpcRequest& request, RpcHandlerPtr handler)
+void RpcChannel::SendRequestHandler(int32 id, RpcHandlerPtr handler,
+		const boost::asio::error_code& err)
 {
-	int size = request.ByteSize();
-	std::stringstream ss;
-	ss << size;
-	ss << request.SerializeAsString();
-	communicator.AsyncWrite(boost::asio::buffer(ss),
+	if (err)
+	{
+		LOG(ERROR) << "SendRequestHandler error:" << e.what();
+		return;
+	}
+}
+
+void RpcChannel::SendSizeHandler(int32 id, RpcHandlerPtr handler,
+		const boost::asio::error_code& err)
+{
+	if (err)
+	{
+		LOG(ERROR) << "SendSizeHandler error:" << e.what();
+		return;
+	}
+	string ss = request.SerializeAsString();
+	boost::asio::async_write(boost::asio::buffer(ss),
 			boost::bind(&RpcChannel::SendRequestHandler, this, request.id(),
 					handler, boost::asio::placeholders::error));
 }
 
+void RpcChannel::SendMessage(const RpcRequestPtr request, RpcHandlerPtr handler)
+{
+	int size = request->ByteSize();
+	string ss = boost::lexical_cast(size);
+	boost::asio::async_write(boost::asio::buffer(ss),
+			boost::bind(&RpcChannel::SendSizeHandler, this, request->id(),
+					handler, boost::asio::placeholders::error));
+}
+
 void RpcChannel::CallMethod(
 		const google::protobuf::MethodDescriptor* method,
 		google::protobuf::RpcController* controller,
 		const google::protobuf::Message* request,
 		google::protobuf::Message* response,
-		google::protobuf::Closure* done) {
-	RpcRequest req;
-	req.set_id(++id);
-	req.set_method(method->full_name());
-	req.set_request(request->SerializeAsString());
-	RpcHandlerPtr handler = boost::make_shared<RpcHandler>(
-			controller, response, done);
-	SendRequest(req, handler);
+		google::protobuf::Closure* done)
+{
+	RpcRequestPtr req(new RpcRequest);
+	req->set_id(++id);
+	req->set_method(method->full_name());
+	req->set_request(request->SerializeAsString());
+	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
+	SendMessage(req, handler);
 }
 
 } // namespace Hainan

+ 4 - 0
src/Net/RpcChannel.h

@@ -19,9 +19,13 @@ private:
 	RpcCallbackMap handlers;
 	RpcCommunicator communicator;
 
+	boost::asio::io_service io_service;
+	boost::asio::ip::tcp::socket socket;
+
 	void SendRequestHandler(int32 id, RpcHandlerPtr handler,
 			const boost::system::error_code& err);
 	void SendRequest(const RpcRequest& request, RpcHandlerPtr handler);
+	void RecvResponse();
 public:
 	RpcChannel(std::string& host, int port);
 	~RpcChannel();

+ 1 - 1
src/Net/RpcCommunicator.cc

@@ -17,7 +17,7 @@ RpcCommunicator::~RpcCommunicator()
 {
 }
 
-void RpcCommunicator::AsyncWrite(boost::asio::buffer buffer,
+void RpcCommunicator::AsyncWrite(boost::asio::buffer& buffer,
 		boost::function<void (const boost::asio::error_code&)> handler)
 {
 	boost::asio::async_write(socket, buffer, handler);

+ 1 - 1
src/Net/RpcCommunicator.h

@@ -15,7 +15,7 @@ private:
 public:
 	RpcCommunicator(std::string& host, int port);
 	~RpcCommunicator();
-	void AsyncWrite(boost::asio::buffer buffer,
+	void AsyncWrite(boost::asio::buffer& buffer,
 			boost::function<void (const boost::asio::error_code&)> handler);
 };
 

+ 9 - 0
src/Net/RpcController.h

@@ -8,7 +8,16 @@ namespace Hainan {
 class RpcController: public google::protobuf::RpcController
 {
 public:
+	RpcController();
+	~RpcController();
 
+	virtual void Reset();
+	virtual bool Failed() const;
+	virtual std::string ErrorText() const;
+	virtual void StartCancel();
+	virtual void SetFailed(const string& reason);
+	virtual bool IsCanceled() const;
+	virtual void NotifyOnCancel(Closure* callback);
 };
 
 } // namespace Hainan