Просмотр исходного кода

暂时删除RpcCommunicator,需要的时候再增加,完善了RpcChannel类

tanghai 15 лет назад
Родитель
Сommit
66a96fb073
5 измененных файлов с 69 добавлено и 97 удалено
  1. 48 34
      src/Net/RpcChannel.cc
  2. 15 7
      src/Net/RpcChannel.h
  3. 0 26
      src/Net/RpcCommunicator.cc
  4. 0 24
      src/Net/RpcCommunicator.h
  5. 6 6
      src/Net/RpcServer.cc

+ 48 - 34
src/Net/RpcChannel.cc

@@ -2,37 +2,41 @@
 #include <boost/make_shared.hpp>
 #include <google/protobuf/message.h>
 #include "Net/RpcChannel.h"
-#include "Net/RpcCommunicator.h"
 
 namespace Hainan {
 
 RpcChannel::RpcChannel(std::string& host, int port):
-		id(0), communicator(host, port)
+		id(0), io_service()
 {
 	// another thread?
-	RecvMessage();
+	boost::asio::ip::address address;
+	address.from_string(host);
+	boost::asio::ip::tcp::endpoint endpoint(address, port);
+	socket.async_connect(endpoint,
+			boost::bind(&RpcChannel::AsyncConnectHandler, this,
+					boost::asio::placeholders::error));
 }
 
-void RpcChannel::RecvResponseHandler(StringPtr ss,
-		const boost::asio::error_code& err)
-{
+void RpcChannel::AsyncConnectHandler(const boost::system::error_code& err) {
 	if (err)
 	{
-		LOG(ERROR) << "receive response failed";
+		LOG(ERROR) << "async connect failed";
 		return;
 	}
+	RecvRequestSize();
+}
 
-	RpcResponse response;
-	Response->ParseFromString(*ss);
-	RpcHandlerPtr handler = handlers[response.id()];
-	handler->GetResponse()->ParseFromString(response.response());
-	handlers.erase(response.id());
-
-	RecvMessage();
+void RpcChannel::RecvRequestSize()
+{
+	IntPtr size(new int);
+	boost::asio::async_read(socket,
+			boost::asio::buffer(
+					reinterpret_cast<char*>(size.get()), sizeof(int)),
+			boost::bind(&RpcChannel::RecvMessage, this, size,
+					boost::asio::placeholders::error));
 }
 
-void RpcChannel::RecvSizeHandler(IntPtr size,
-		const boost::asio::error_code& err)
+void RpcChannel::RecvMessage(IntPtr size, const boost::system::error_code& err)
 {
 	if (err)
 	{
@@ -42,51 +46,61 @@ void RpcChannel::RecvSizeHandler(IntPtr size,
 	StringPtr ss;
 	boost::asio::async_read(socket,
 			boost::asio::buffer(*ss, *size),
-			boost::bind(&RpcChannel::RecvResponseHandler, this, ss,
+			boost::bind(&RpcChannel::RecvMessageHandler, this, ss,
 					boost::asio::placeholders::error));
 }
 
-void RpcChannel::RecvMessage()
+void RpcChannel::RecvMessageHandler(StringPtr ss,
+		const boost::system::error_code& err)
 {
-	IntPtr size(new int);
-	boost::asio::async_read(socket,
-			boost::asio::buffer(
-					reinterpret_cast<char*>(size.get()), sizeof(int)),
-			boost::bind(&RpcChannel::RecvSizeHandler, this, size,
-					boost::asio::placeholders::error));
+	if (err)
+	{
+		LOG(ERROR) << "receive response failed";
+		return;
+	}
+
+	RpcResponse response;
+	Response->ParseFromString(*ss);
+	RpcHandlerPtr handler = handlers[response.id()];
+	handler->GetResponse()->ParseFromString(response.response());
+	handlers.erase(response.id());
+
+	// read size
+	RecvRequestSize();
 }
 
-void RpcChannel::SendRequestHandler(int32 id, RpcHandlerPtr handler,
-		const boost::asio::error_code& err)
+void RpcChannel::SendMessageHandler(int32 id, RpcHandlerPtr handler,
+		const boost::system::error_code& err)
 {
 	if (err)
 	{
-		LOG(ERROR) << "SendRequestHandler error:" << e.what();
+		LOG(ERROR) << "SendMessage error:";
 		return;
 	}
 	handlers[id] = handler;
 }
 
-void RpcChannel::SendSizeHandler(const RpcRequestPtr request,
-		RpcHandlerPtr handler, const boost::asio::error_code& err)
+void RpcChannel::SendMessage(const RpcRequestPtr request,
+		RpcHandlerPtr handler, const boost::system::error_code& err)
 {
 	if (err)
 	{
-		LOG(ERROR) << "SendSizeHandler error:" << e.what();
+		LOG(ERROR) << "SendRequestSize error:";
 		return;
 	}
 	std::string ss = request->SerializeAsString();
 	boost::asio::async_write(socket, boost::asio::buffer(ss),
-			boost::bind(&RpcChannel::SendRequestHandler, this, request->id(),
+			boost::bind(&RpcChannel::SendMessageHandler, this, request->id(),
 					handler, boost::asio::placeholders::error));
 }
 
-void RpcChannel::SendMessage(const RpcRequestPtr request, RpcHandlerPtr handler)
+void RpcChannel::SendRequestSize(const RpcRequestPtr request,
+		RpcHandlerPtr handler)
 {
 	int size = request->ByteSize();
 	std::string ss = boost::lexical_cast(size);
 	boost::asio::async_write(socket, boost::asio::buffer(ss),
-			boost::bind(&RpcChannel::SendSizeHandler, this, request,
+			boost::bind(&RpcChannel::SendMessage, this, request,
 					handler, boost::asio::placeholders::error));
 }
 
@@ -102,7 +116,7 @@ void RpcChannel::CallMethod(
 	req->set_method(method->full_name());
 	req->set_request(request->SerializeAsString());
 	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
-	SendMessage(req, handler);
+	SendRequestSize(req, handler);
 }
 
 } // namespace Hainan

+ 15 - 7
src/Net/RpcChannel.h

@@ -5,7 +5,6 @@
 #include <boost/unordered_map.hpp>
 #include <boost/asio.hpp>
 #include "Base/Base.h"
-#include "Net/RpcCommunicator.h"
 
 namespace Hainan {
 
@@ -13,19 +12,28 @@ class RpcHandler;
 
 class RpcChannel: public google::protobuf::RpcChannel
 {
-	typedef boost::unordered_map<int32, RpcHandlerPtr> RpcCallbackMap;
 private:
+	typedef boost::unordered_map<int32, RpcHandlerPtr> RpcCallbackMap;
+
 	int32 id;
 	RpcCallbackMap handlers;
-	RpcCommunicator communicator;
-
 	boost::asio::io_service io_service;
 	boost::asio::ip::tcp::socket socket;
 
-	void SendRequestHandler(int32 id, RpcHandlerPtr handler,
+	void AsyncConnectHandler(const boost::system::error_code& err);
+
+	// recieve response
+	void RecvRequestSize();
+	void RecvMessage(IntPtr size, const boost::system::error_code& err);
+	void RecvMessageHandler(StringPtr ss, const boost::system::error_code& err);
+
+	// send request
+	void SendRequestSize(const RpcRequestPtr request, RpcHandlerPtr handler);
+	void SendMessage(const RpcRequestPtr request,
+			RpcHandlerPtr handler, const boost::system::error_code& err);
+	void SendMessageHandler(int32 id, RpcHandlerPtr handler,
 			const boost::system::error_code& err);
-	void SendRequest(const RpcRequest& request, RpcHandlerPtr handler);
-	void RecvResponse();
+
 public:
 	RpcChannel(std::string& host, int port);
 	~RpcChannel();

+ 0 - 26
src/Net/RpcCommunicator.cc

@@ -1,26 +0,0 @@
-#include "Net/RpcCommunicator.h"
-
-namespace Hainan {
-
-RpcCommunicator::RpcCommunicator(std::string& host, int port):
-		io_service(), socket(io_service)
-{
-	boost::asio::ip::address address;
-	address.from_string(host);
-	boost::asio::ip::tcp::endpoint endpoint(address, port);
-	socket.async_connect(endpoint,
-			boost::bind(&RpcCommunicator::AsynConnectHandler, this,
-					boost::asio::placeholders::error));
-}
-
-RpcCommunicator::~RpcCommunicator()
-{
-}
-
-void RpcCommunicator::AsyncWrite(boost::asio::buffer& buffer,
-		boost::function<void (const boost::asio::error_code&)> handler)
-{
-	boost::asio::async_write(socket, buffer, handler);
-}
-
-} // namespace Hainan

+ 0 - 24
src/Net/RpcCommunicator.h

@@ -1,24 +0,0 @@
-#ifndef NET_RPC_COMMUNICATOR_H
-#define NET_RPC_COMMUNICATOR_H
-
-#include <boost/asio.hpp>
-
-namespace Hainan {
-
-class RpcRequest;
-
-class RpcCommunicator
-{
-private:
-	boost::asio::io_service io_service;
-	boost::asio::ip::tcp::socket socket;
-public:
-	RpcCommunicator(std::string& host, int port);
-	~RpcCommunicator();
-	void AsyncWrite(boost::asio::buffer& buffer,
-			boost::function<void (const boost::asio::error_code&)> handler);
-};
-
-}
-
-#endif // NET_RPC_COMMUNICATOR_H

+ 6 - 6
src/Net/RpcServer.cc

@@ -6,8 +6,7 @@
 
 namespace Hainan {
 
-RpcServer::RpcServer(std::string& host, int port):
-		rpc_server(new RpcServer::RpcServerInternal())
+RpcServer::RpcServer(std::string& host, int port)
 {
 	boost::asio::ip::address address;
 	address.from_string(host);
@@ -17,6 +16,10 @@ RpcServer::RpcServer(std::string& host, int port):
 			boost::asio::ip::tcp::acceptor::reuse_address(true));
 	acceptor.bind(endpoint);
 	acceptor.listen();
+	RpcSessionPtr new_session(new RpcSession(io_service, sessions));
+	acceptor.async_accept(new_session->socket(),
+			boost::bind(&RpcServer::HandleAsyncAccept, this,
+					boost::asio::placeholders::error));
 }
 
 void RpcServer::HandleAsyncAccept(
@@ -36,10 +39,7 @@ void RpcServer::HandleAsyncAccept(
 
 void RpcServer::Start()
 {
-	RpcSessionPtr new_session(new RpcSession(io_service, sessions));
-	acceptor.async_accept(new_session->socket(),
-			boost::bind(&RpcServer::HandleAsyncAccept, this,
-					boost::asio::placeholders::error));
+	io_service.run();
 }
 
 void RpcServer::Stop()