Просмотр исходного кода

把message type记录在消息头中

tanghai 14 лет назад
Родитель
Сommit
e0d285a563

+ 3 - 4
Src/Egametang/Rpc/CMakeLists.txt

@@ -1,5 +1,4 @@
 PROTOBUF_GENERATE_CPP(proto_srcs proto_hdrs 
 PROTOBUF_GENERATE_CPP(proto_srcs proto_hdrs 
-	RpcData.proto 
 	Echo.proto
 	Echo.proto
 	)
 	)
 
 
@@ -7,7 +6,7 @@ SET(RpcSrc
 	RpcCommunicator.cc
 	RpcCommunicator.cc
 	RpcController.cc
 	RpcController.cc
 	RpcHandler.cc
 	RpcHandler.cc
-	RpcChannel.cc
+#	RpcChannel.cc
 	${proto_srcs}
 	${proto_srcs}
 	)
 	)
 
 
@@ -15,11 +14,11 @@ SET(RpcSrc
 ADD_LIBRARY(Rpc ${RpcSrc})
 ADD_LIBRARY(Rpc ${RpcSrc})
 
 
 ADD_EXECUTABLE(RpcCommunicatorTest RpcCommunicatorTest.cc)
 ADD_EXECUTABLE(RpcCommunicatorTest RpcCommunicatorTest.cc)
-ADD_EXECUTABLE(RpcChannelTest RpcChannelTest.cc)
+#ADD_EXECUTABLE(RpcChannelTest RpcChannelTest.cc)
 
 
 SET(Excutes 
 SET(Excutes 
 	RpcCommunicatorTest
 	RpcCommunicatorTest
-	RpcChannelTest
+#	RpcChannelTest
 )
 )
 
 
 FOREACH(Excute ${Excutes})
 FOREACH(Excute ${Excutes})

+ 12 - 15
Src/Egametang/Rpc/RpcChannel.cc

@@ -6,7 +6,6 @@
 #include "Rpc/RpcCommunicator.h"
 #include "Rpc/RpcCommunicator.h"
 #include "Rpc/RpcChannel.h"
 #include "Rpc/RpcChannel.h"
 #include "Rpc/RpcHandler.h"
 #include "Rpc/RpcHandler.h"
-#include "Rpc/RpcData.pb.h"
 
 
 namespace Egametang {
 namespace Egametang {
 
 
@@ -33,7 +32,7 @@ void RpcChannel::OnAsyncConnect(const boost::system::error_code& err)
 		LOG(ERROR) << "async connect failed: " << err.message();
 		LOG(ERROR) << "async connect failed: " << err.message();
 		return;
 		return;
 	}
 	}
-	RecvSize();
+	RecvMeta();
 }
 }
 
 
 void RpcChannel::OnRecvMessage(StringPtr ss)
 void RpcChannel::OnRecvMessage(StringPtr ss)
@@ -51,20 +50,13 @@ void RpcChannel::OnRecvMessage(StringPtr ss)
 	}
 	}
 
 
 	// read size
 	// read size
-	RecvSize();
+	RecvMeta();
 }
 }
 
 
 void RpcChannel::OnSendMessage()
 void RpcChannel::OnSendMessage()
 {
 {
 }
 }
 
 
-void RpcChannel::SendRequest(RpcRequestPtr request)
-{
-	int size = request->ByteSize();
-	std::string message = request->SerializeAsString();
-	SendSize(size, message);
-}
-
 void RpcChannel::Stop()
 void RpcChannel::Stop()
 {
 {
 	RpcCommunicator::Stop();
 	RpcCommunicator::Stop();
@@ -77,14 +69,19 @@ void RpcChannel::CallMethod(
 		google::protobuf::Message* response,
 		google::protobuf::Message* response,
 		google::protobuf::Closure* done)
 		google::protobuf::Closure* done)
 {
 {
-	RpcRequestPtr req(new RpcRequest);
-	req->set_id(++id_);
-	req->set_method(method->full_name());
-	req->set_request(request->SerializeAsString());
 	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
 	RpcHandlerPtr handler(new RpcHandler(controller, response, done));
 	handlers_[id_] = handler;
 	handlers_[id_] = handler;
 
 
-	SendRequest(req);
+	boost::hash<std::string> string_hash;
+
+	std::string message = request->SerializeAsString();
+	RpcMeta meta;
+	meta.size = message.size();
+	meta.id = ++id_;
+	meta.opcode = string_hash(method->full_name());
+	meta.checksum = string_hash(message);
+
+	SendMeta(meta, message);
 }
 }
 
 
 } // namespace Egametang
 } // namespace Egametang

+ 0 - 1
Src/Egametang/Rpc/RpcChannel.h

@@ -23,7 +23,6 @@ private:
 	RpcCallbackMap handlers_;
 	RpcCallbackMap handlers_;
 
 
 	void OnAsyncConnect(const boost::system::error_code& err);
 	void OnAsyncConnect(const boost::system::error_code& err);
-	void SendRequest(RpcRequestPtr request);
 
 
 	virtual void OnRecvMessage(StringPtr ss);
 	virtual void OnRecvMessage(StringPtr ss);
 	virtual void OnSendMessage();
 	virtual void OnSendMessage();

+ 0 - 1
Src/Egametang/Rpc/RpcChannelTest.cc

@@ -5,7 +5,6 @@
 #include "Thread/CountBarrier.h"
 #include "Thread/CountBarrier.h"
 #include "Thread/ThreadPool.h"
 #include "Thread/ThreadPool.h"
 #include "Rpc/RpcController.h"
 #include "Rpc/RpcController.h"
-#include "Rpc/RpcData.pb.h"
 #include "Rpc/Echo.pb.h"
 #include "Rpc/Echo.pb.h"
 
 
 namespace Egametang {
 namespace Egametang {

+ 15 - 13
Src/Egametang/Rpc/RpcCommunicator.cc

@@ -16,43 +16,45 @@ boost::asio::ip::tcp::socket& RpcCommunicator::Socket()
 	return socket_;
 	return socket_;
 }
 }
 
 
-void RpcCommunicator::RecvSize()
+void RpcCommunicator::RecvMeta()
 {
 {
-	IntPtr size(new int(0));
+	RpcMetaPtr meta(new RpcMeta());
 	boost::asio::async_read(socket_,
 	boost::asio::async_read(socket_,
-			boost::asio::buffer(reinterpret_cast<char*>(size.get()), sizeof(int)),
-			boost::bind(&RpcCommunicator::RecvMessage, this, size,
+			boost::asio::buffer(reinterpret_cast<char*>(meta.get()), sizeof(*meta)),
+			boost::bind(&RpcCommunicator::RecvMessage, this, meta,
 					boost::asio::placeholders::error));
 					boost::asio::placeholders::error));
 }
 }
 
 
-void RpcCommunicator::RecvMessage(IntPtr size, const boost::system::error_code& err)
+void RpcCommunicator::RecvMessage(RpcMetaPtr meta, const boost::system::error_code& err)
 {
 {
 	if (err)
 	if (err)
 	{
 	{
 		LOG(ERROR) << "receive message size failed: " << err.message();
 		LOG(ERROR) << "receive message size failed: " << err.message();
 		return;
 		return;
 	}
 	}
-	StringPtr ss(new std::string(*size, '\0'));
+	StringPtr message(new std::string(meta->size, '\0'));
 	boost::asio::async_read(socket_,
 	boost::asio::async_read(socket_,
-			boost::asio::buffer(reinterpret_cast<char*>(&ss->at(0)), *size),
-			boost::bind(&RpcCommunicator::RecvDone, this, ss,
+			boost::asio::buffer(reinterpret_cast<char*>(&message->at(0)), meta->size),
+			boost::bind(&RpcCommunicator::RecvDone, this,
+					meta, message,
 					boost::asio::placeholders::error));
 					boost::asio::placeholders::error));
 }
 }
 
 
-void RpcCommunicator::RecvDone(StringPtr ss, const boost::system::error_code& err)
+void RpcCommunicator::RecvDone(RpcMetaPtr meta, StringPtr message,
+		const boost::system::error_code& err)
 {
 {
 	if (err)
 	if (err)
 	{
 	{
 		LOG(ERROR) << "receive message failed: " << err.message();
 		LOG(ERROR) << "receive message failed: " << err.message();
 		return;
 		return;
 	}
 	}
-	OnRecvMessage(ss);
+	OnRecvMessage(meta, message);
 }
 }
 
 
-void RpcCommunicator::SendSize(int size, std::string message)
+void RpcCommunicator::SendMeta(RpcMeta& meta, std::string message)
 {
 {
 	boost::asio::async_write(socket_,
 	boost::asio::async_write(socket_,
-			boost::asio::buffer(reinterpret_cast<char*>(&size), sizeof(int)),
+			boost::asio::buffer(reinterpret_cast<char*>(&meta), sizeof(meta)),
 			boost::bind(&RpcCommunicator::SendMessage, this, message,
 			boost::bind(&RpcCommunicator::SendMessage, this, message,
 					boost::asio::placeholders::error));
 					boost::asio::placeholders::error));
 }
 }
@@ -78,7 +80,7 @@ void RpcCommunicator::SendDone(const boost::system::error_code& err)
 	OnSendMessage();
 	OnSendMessage();
 }
 }
 
 
-void RpcCommunicator::OnRecvMessage(StringPtr ss)
+void RpcCommunicator::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 {
 {
 }
 }
 
 

+ 39 - 5
Src/Egametang/Rpc/RpcCommunicator.h

@@ -10,6 +10,39 @@
 
 
 namespace Egametang {
 namespace Egametang {
 
 
+struct RpcMeta
+{
+	// message长度
+	std::size_t size;
+
+	// 消息id, 用于处理异步回调
+	std::size_t id;
+
+	// 消息opcode, 是proto的full_path哈希值
+	std::size_t opcode;
+
+	// 校验值, 整个message的哈希值
+	std::size_t checksum;
+
+	RpcMeta(): size(0), id(0), opcode(0), checksum(0)
+	{
+	}
+
+	bool Verify(std::string message)
+	{
+		boost::hash<std::string> string_hash;
+		if (checksum == string_hash(message))
+		{
+			return true;
+		}
+		return false;
+	}
+
+	std::string ToString()
+	{
+	}
+};
+
 class RpcCommunicator
 class RpcCommunicator
 {
 {
 protected:
 protected:
@@ -22,17 +55,18 @@ public:
 	boost::asio::ip::tcp::socket& Socket();
 	boost::asio::ip::tcp::socket& Socket();
 
 
 	// recieve response
 	// recieve response
-	void RecvSize();
-	void RecvMessage(IntPtr size, const boost::system::error_code& err);
-	void RecvDone(StringPtr ss, const boost::system::error_code& err);
+	void RecvMeta();
+	void RecvMessage(RpcMetaPtr meta, const boost::system::error_code& err);
+	void RecvDone(RpcMetaPtr meta, StringPtr message, const boost::system::error_code& err);
 
 
 	// send request
 	// send request
-	void SendSize(int size, std::string message);
+	void SendMeta(RpcMeta& meta, std::string message);
 	void SendMessage(std::string message, const boost::system::error_code& err);
 	void SendMessage(std::string message, const boost::system::error_code& err);
 	void SendDone(const boost::system::error_code& err);
 	void SendDone(const boost::system::error_code& err);
 
 
-	virtual void OnRecvMessage(StringPtr ss);
+	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message);
 	virtual void OnSendMessage();
 	virtual void OnSendMessage();
+
 	virtual void Stop();
 	virtual void Stop();
 };
 };
 
 

+ 40 - 14
Src/Egametang/Rpc/RpcCommunicatorTest.cc

@@ -16,6 +16,7 @@ class RpcServerTest: public RpcCommunicator
 public:
 public:
 	CountBarrier& barrier_;
 	CountBarrier& barrier_;
 	std::string recv_string_;
 	std::string recv_string_;
+	RpcMetaPtr meta_;
 	boost::asio::ip::tcp::acceptor acceptor_;
 	boost::asio::ip::tcp::acceptor acceptor_;
 
 
 public:
 public:
@@ -42,7 +43,7 @@ public:
 			LOG(ERROR) << "async accept failed: " << err.message();
 			LOG(ERROR) << "async accept failed: " << err.message();
 			return;
 			return;
 		}
 		}
-		RecvSize();
+		RecvMeta();
 	}
 	}
 
 
 	void Start()
 	void Start()
@@ -57,12 +58,20 @@ public:
 		socket_.close();
 		socket_.close();
 	}
 	}
 
 
-	virtual void OnRecvMessage(StringPtr ss)
+	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 	{
 	{
-		VLOG(2) << "Server Recv string: " << *ss;
-		recv_string_ = *ss;
-		std::string send_string("response test rpc communicator string");
-		SendSize(send_string.size(), send_string);
+		VLOG(2) << "Server Recv string: " << *message;
+		recv_string_ = *message;
+		meta_ = meta;
+
+		boost::hash<std::string> string_hash;
+
+		std::string response_string("response test rpc communicator string");
+		RpcMeta response_meta;
+		response_meta.size = response_string.size();
+		response_meta.checksum = string_hash(response_string);
+		response_meta.opcode = 123456;
+		SendMeta(response_meta, response_string);
 		barrier_.Signal();
 		barrier_.Signal();
 	}
 	}
 	virtual void OnSendMessage()
 	virtual void OnSendMessage()
@@ -75,6 +84,7 @@ class RpcClientTest: public RpcCommunicator
 public:
 public:
 	CountBarrier& barrier_;
 	CountBarrier& barrier_;
 	std::string recv_string_;
 	std::string recv_string_;
+	RpcMetaPtr meta_;
 
 
 public:
 public:
 	RpcClientTest(boost::asio::io_service& io_service, int port,
 	RpcClientTest(boost::asio::io_service& io_service, int port,
@@ -107,15 +117,22 @@ public:
 			LOG(ERROR) << "async connect failed: " << err.message();
 			LOG(ERROR) << "async connect failed: " << err.message();
 			return;
 			return;
 		}
 		}
+		boost::hash<std::string> string_hash;
+
 		std::string send_string("send test rpc communicator string");
 		std::string send_string("send test rpc communicator string");
-		SendSize(send_string.size(), send_string);
-		RecvSize();
+		RpcMeta meta;
+		meta.size = send_string.size();
+		meta.checksum = string_hash(send_string);
+		meta.opcode = 654321;
+		SendMeta(meta, send_string);
+		RecvMeta();
 	}
 	}
 
 
-	virtual void OnRecvMessage(StringPtr ss)
+	virtual void OnRecvMessage(RpcMetaPtr meta, StringPtr message)
 	{
 	{
-		VLOG(2) << "Client Recv string: " << *ss;
-		recv_string_ = *ss;
+		VLOG(2) << "Client Recv string: " << *message;
+		recv_string_ = *message;
+		meta_ = meta;
 		barrier_.Signal();
 		barrier_.Signal();
 	}
 	}
 
 
@@ -143,17 +160,26 @@ public:
 };
 };
 
 
 
 
-TEST_F(RpcCommunicatorTest, ClientSendString)
+TEST_F(RpcCommunicatorTest, SendAndRecvString)
 {
 {
 	ThreadPool thread_pool(2);
 	ThreadPool thread_pool(2);
 	thread_pool.PushTask(boost::bind(&RpcServerTest::Start, &rpc_server_));
 	thread_pool.PushTask(boost::bind(&RpcServerTest::Start, &rpc_server_));
 	thread_pool.PushTask(boost::bind(&RpcClientTest::Start, &rpc_client_));
 	thread_pool.PushTask(boost::bind(&RpcClientTest::Start, &rpc_client_));
 	barrier_.Wait();
 	barrier_.Wait();
-	ASSERT_EQ(std::string("send test rpc communicator string"), rpc_server_.recv_string_);
-	ASSERT_EQ(std::string("response test rpc communicator string"), rpc_client_.recv_string_);
 	thread_pool.Wait();
 	thread_pool.Wait();
 	rpc_server_.Stop();
 	rpc_server_.Stop();
 	rpc_client_.Stop();
 	rpc_client_.Stop();
+
+	boost::hash<std::string> string_hash;
+	ASSERT_EQ(std::string("send test rpc communicator string"), rpc_server_.recv_string_);
+	ASSERT_EQ(rpc_server_.meta_->size, rpc_server_.recv_string_.size());
+	ASSERT_EQ(rpc_server_.meta_->checksum, string_hash(rpc_server_.recv_string_));
+	ASSERT_EQ(654321U, rpc_server_.meta_->opcode);
+
+	ASSERT_EQ(std::string("response test rpc communicator string"), rpc_client_.recv_string_);
+	ASSERT_EQ(rpc_client_.meta_->size, rpc_client_.recv_string_.size());
+	ASSERT_EQ(rpc_client_.meta_->checksum, string_hash(rpc_client_.recv_string_));
+	ASSERT_EQ(123456U, rpc_client_.meta_->opcode);
 }
 }
 
 
 } // namespace Egametang
 } // namespace Egametang

+ 0 - 22
Src/Egametang/Rpc/RpcData.proto

@@ -1,22 +0,0 @@
-package Egametang;
-
-enum ResponseType
-{
-	RESPONSE_TYPE_OK = 1;
-	RESPONSE_TYPE_ERROR = 2;
-}
-
-message RpcRequest
-{
-	required int32 id = 1;
-	required string method = 2;
-	optional bytes request = 3;
-}
-
-message RpcResponse
-{
-	required int32 id = 1;
-	required ResponseType type = 2;
-	optional bytes error = 3;
-	optional bytes response = 4;
-}

+ 2 - 0
Src/Egametang/Rpc/RpcTypedef.h

@@ -15,11 +15,13 @@ class RpcRequest;
 class RpcChannel;
 class RpcChannel;
 class RpcHandler;
 class RpcHandler;
 class RpcResponse;
 class RpcResponse;
+class RpcMeta;
 typedef boost::shared_ptr<RpcSession>  RpcSessionPtr;
 typedef boost::shared_ptr<RpcSession>  RpcSessionPtr;
 typedef boost::shared_ptr<RpcRequest>  RpcRequestPtr;
 typedef boost::shared_ptr<RpcRequest>  RpcRequestPtr;
 typedef boost::shared_ptr<RpcChannel>  RpcChannelPtr;
 typedef boost::shared_ptr<RpcChannel>  RpcChannelPtr;
 typedef boost::shared_ptr<RpcHandler>  RpcHandlerPtr;
 typedef boost::shared_ptr<RpcHandler>  RpcHandlerPtr;
 typedef boost::shared_ptr<RpcResponse> RpcResponsePtr;
 typedef boost::shared_ptr<RpcResponse> RpcResponsePtr;
+typedef boost::shared_ptr<RpcMeta> 	   RpcMetaPtr;
 
 
 } // namespace Egametang
 } // namespace Egametang