Преглед изворни кода

简单实现客户端channel

tanghai пре 15 година
родитељ
комит
111d062ea8

+ 2 - 2
src/Base/Base.h

@@ -1,7 +1,7 @@
 #ifndef BASE_BASE_H
 #define BASE_BASE_H
 
-#include "base/typedefs.h"
-#include "base/marcos.h"
+#include "Base/Typedefs.h"
+#include "Base/Marcos.h"
 
 #endif // BASE_BASE_H

+ 34 - 19
src/Net/RpcChannel.cc

@@ -1,39 +1,54 @@
+#include <boost/asio.hpp>
+#include <boost/make_shared.hpp>
 #include <google/protobuf/message.h>
-#include "net/rpc_channel.h"
+#include "Net/RpcChannel.h"
+#include "Net/RpcCommunicator.h"
 
 namespace Hainan {
 
-google::protobuf::Closure* to_closure();
-
-rpc_channel::rpc_channel(std::string& host, int port):
+RpcChannel::RpcChannel(std::string& host, int port):
 		id(0)
 {
+	// socket.async_connect(endpoint, );
+}
+
+void RpcCommunicator::SendRequestHandler(int32 id, RpcHandlerPtr handler,
+		const boost::system::error_code& err)
+{
+	if (err)
+	{
+		handler->GetController()->SetFailed("failed");
+	}
+	else
+	{
+		handlers[id] = handler;
+	}
 }
 
-//void client_channel::register_service(const google::protobuf::Service service) {
-//	const google::protobuf::ServiceDescriptor* service_descriptor =
-//			service->GetDescriptor();
-//	for (int i = 0; i < service_descriptor; ++i)
-//	{
-//		const google::protobuf::MethodDescriptor* method =
-//				service_descriptor->method(i);
-//		std::string method_name(method->full_name());
-//		service_map_[method_name] = service;
-//	}
-//}
+void RpcChannel::SendRequest(const RpcRequest& request, RpcHandlerPtr handler)
+{
+	int size = request.ByteSize();
+	std::stringstream ss;
+	ss << size;
+	ss << request.SerializeAsString();
+	communicator.AsyncWrite(boost::asio::buffer(ss),
+			boost::bind(&RpcChannel::SendRequestHandler, this, request.id(),
+					handler, boost::asio::placeholders::error));
+}
 
-void rpc_channel::CallMethod(
+void RpcChannel::CallMethod(
 		const google::protobuf::MethodDescriptor* method,
 		google::protobuf::RpcController* controller,
 		const google::protobuf::Message* request,
 		google::protobuf::Message* response,
 		google::protobuf::Closure* done) {
-	rpc_request req;
+	RpcRequest req;
 	req.set_id(++id);
 	req.set_method(method->full_name());
 	req.set_request(request->SerializeAsString());
-	rpc_callback callback(controller, response, done);
-	communicator.send_message(req, callback);
+	RpcHandlerPtr handler = boost::make_shared<RpcHandler>(
+			controller, response, done);
+	SendRequest(req, handler);
 }
 
 } // namespace Hainan

+ 14 - 6
src/Net/RpcChannel.h

@@ -4,19 +4,27 @@
 #include <google/protobuf/service.h>
 #include <boost/unordered_map.hpp>
 #include <boost/asio.hpp>
-#include "base/base.h"
-#include "net/rpc_communicator.h"
+#include "Base/Base.h"
+#include "Net/RpcCommunicator.h"
 
 namespace Hainan {
 
-class rpc_channel: public google::protobuf::RpcChannel
+class RpcHandler;
+
+class RpcChannel: public google::protobuf::RpcChannel
 {
+	typedef boost::unordered_map<int32, RpcHandlerPtr> RpcCallbackMap;
 private:
 	int32 id;
-	rpc_communicator communicator;
+	RpcCallbackMap handlers;
+	RpcCommunicator communicator;
+
+	void SendRequestHandler(int32 id, RpcHandlerPtr handler,
+			const boost::system::error_code& err);
+	void SendRequest(const RpcRequest& request, RpcHandlerPtr handler);
 public:
-	rpc_channel(std::string& host, int port);
-	~rpc_channel();
+	RpcChannel(std::string& host, int port);
+	~RpcChannel();
 	virtual void CallMethod(
 			const google::protobuf::MethodDescriptor* method,
 			google::protobuf::RpcController* controller,

+ 18 - 3
src/Net/RpcCommunicator.cc

@@ -1,11 +1,26 @@
-#include "net/rpc_communicator.h"
+#include "Net/RpcCommunicator.h"
 
 namespace Hainan {
 
-void rpc_communicator::send_message(
-		const rpc_request& req, rpc_callback& callback)
+RpcCommunicator::RpcCommunicator(std::string& host, int port):
+		io_service(), socket(io_service)
 {
+	boost::asio::ip::address address;
+	address.from_string(host);
+	boost::asio::ip::tcp::endpoint endpoint(address, port);
+	socket.async_connect(endpoint,
+			boost::bind(&RpcCommunicator::AsynConnectHandler, this,
+					boost::asio::placeholders::error));
+}
 
+RpcCommunicator::~RpcCommunicator()
+{
+}
+
+void RpcCommunicator::AsyncWrite(boost::asio::buffer buffer,
+		boost::function<void (const boost::asio::error_code&)> handler)
+{
+	boost::asio::async_write(socket, buffer, handler);
 }
 
 } // namespace Hainan

+ 7 - 6
src/Net/RpcCommunicator.h

@@ -5,17 +5,18 @@
 
 namespace Hainan {
 
-class rpc_request;
-class rpc_callback;
+class RpcRequest;
 
-class rpc_communicator
+class RpcCommunicator
 {
 private:
+	boost::asio::io_service io_service;
 	boost::asio::ip::tcp::socket socket;
 public:
-	rpc_communicator(boost::asio::ip::tcp::endpoint& endpoint);
-	~rpc_communicator();
-	void send_message(const rpc_request& req, rpc_callback& callback);
+	RpcCommunicator(std::string& host, int port);
+	~RpcCommunicator();
+	void AsyncWrite(boost::asio::buffer buffer,
+			boost::function<void (const boost::asio::error_code&)> handler);
 };
 
 }

+ 1 - 1
src/Net/RpcController.cc

@@ -1 +1 @@
-#include "net/rpc_controller.h"
+#include "Net/RpcController.h"

+ 1 - 1
src/Net/RpcController.h

@@ -5,7 +5,7 @@
 
 namespace Hainan {
 
-class rpc_controller: public google::protobuf::RpcController
+class RpcController: public google::protobuf::RpcController
 {
 public:
 

+ 27 - 0
src/Net/RpcHandler.cc

@@ -0,0 +1,27 @@
+#include "Net/RpcHandler.h"
+
+namespace Hainan {
+
+RpcHandler::RpcHandler(google::protobuf::RpcController* p_controller,
+		google::protobuf::Message* p_response,
+		google::protobuf::Closure* p_done):
+		controller(p_controller), response(p_response), done(p_done)
+{
+}
+
+google::protobuf::RpcController *RpcHandler::GetController() const
+{
+    return controller;
+}
+
+google::protobuf::Closure *RpcHandler::GetDone() const
+{
+    return done;
+}
+
+google::protobuf::Message *RpcHandler::GetResponse() const
+{
+    return response;
+}
+
+} // namespace Hainan

+ 31 - 0
src/Net/RpcHandler.h

@@ -0,0 +1,31 @@
+#ifndef NET_RPC_HANDLER_H
+#define NET_RPC_HANDLER_H
+
+#include "boost/shared_ptr.hpp"
+
+namespace Hainan {
+
+class google::protobuf::RpcController;
+class google::protobuf::Message;
+class google::protobuf::Closure;
+
+class RpcHandler
+{
+private:
+	google::protobuf::RpcController* controller;
+	google::protobuf::Message* response;
+	google::protobuf::Closure* done;
+public:
+	RpcHandler(google::protobuf::RpcController* p_controller,
+			google::protobuf::Message* p_response,
+			google::protobuf::Closure* p_done);
+    google::protobuf::RpcController *GetController() const;
+    google::protobuf::Closure *GetDone() const;
+    google::protobuf::Message *GetResponse() const;
+};
+
+typedef boost::shared_ptr<RpcHandler> RpcHandlerPtr;
+
+} // namespace Hainan
+
+#endif // NET_RPC_HANDLER_H

+ 7 - 7
src/Net/RpcProtobufData.proto

@@ -1,21 +1,21 @@
-package hainan;
+package Hainan;
 
-enum response_type
+enum ResponseType
 {
 	RESPONSE_TYPE_OK = 1;
 	RESPONSE_TYPE_ERROR = 2;
 }
 
-message rpc_request
+message RpcRequest
 {
-	required uint32 id = 1;
+	required int32 id = 1;
 	required string method = 2;
 	optional bytes request = 3;
 }
 
-message rpc_response
+message RpcResponse
 {
-	required uint32 id = 1;
-	required response_type type = 2;
+	required int32 id = 1;
+	required ResponseType type = 2;
 	optional bytes response = 3;
 }

+ 10 - 10
src/Thread/ThreadPool.cc

@@ -3,10 +3,15 @@
 
 namespace Hainan {
 
-ThreadPool::ThreadPool() :
-	num(0), running(false), work_num(0)
+ThreadPool::ThreadPool(int num) :
+	thread_num(num), running(false), work_num(0)
 {
+	if (thread_num == 0)
+	{
+		thread_num = boost::thread::hardware_concurrency();
+	}
 }
+
 ThreadPool::~ThreadPool()
 {
 }
@@ -14,14 +19,14 @@ ThreadPool::~ThreadPool()
 void ThreadPool::Start()
 {
 	running = true;
-	for (int i = 0; i < num; ++i)
+	for (int i = 0; i < thread_num; ++i)
 	{
 		thread_ptr t(new boost::thread(
 				boost::bind(&ThreadPool::Runner, this)));
 		threads.push_back(t);
 		t->detach();
 	}
-	work_num = num;
+	work_num = thread_num;
 }
 
 void ThreadPool::Stop()
@@ -71,7 +76,7 @@ void ThreadPool::Runner()
 			task();
 		}
 	}
-	if (__sync_sub_and_fetch(&work_num, 1) == 0)
+	if (boost::detail::atomic_increment(&work_num) == 0)
 	{
 		VLOG(3) << "work_num = " << work_num;
 		done.notify_one();
@@ -94,9 +99,4 @@ bool ThreadPool::PushTask(boost::function<void (void)> task)
 	return true;
 }
 
-void ThreadPool::SetNum(int num)
-{
-	this->num = num;
-}
-
 } // namespace Hainan

+ 3 - 4
src/Thread/ThreadPool.h

@@ -5,14 +5,14 @@
 #include <boost/thread.hpp>
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
-#include "base/base.h"
+#include "Base/Base.h"
 
 namespace Hainan {
 
 class ThreadPool: private boost::noncopyable
 {
 private:
-	int num;
+	int thread_num;
 	volatile int work_num;
 	volatile bool running;
 	boost::mutex mutex;
@@ -23,11 +23,10 @@ private:
 
 	void Runner();
 public:
-	ThreadPool();
+	ThreadPool(int num = 0);
 	~ThreadPool();
 	void Start();
 	void Stop();
-	void SetNum(int num);
 	bool PushTask(boost::function<void (void)> task);
 };
 

+ 5 - 7
src/Thread/ThreadPoolTest.cc

@@ -7,11 +7,10 @@
 
 namespace Hainan {
 
-class thread_pool_test: public testing::Test
+class ThreadPoolTest: public testing::Test
 {
 	void SetUp()
 	{
-		pool.SetNum(10);
 		pool.Start();
 	}
 	void TearDown()
@@ -20,17 +19,16 @@ class thread_pool_test: public testing::Test
 protected:
 	ThreadPool pool;
 public:
-	thread_pool_test() :
-		pool()
+	ThreadPoolTest() : pool(10)
 	{
 	}
-	void max(int a, int b, int* z)
+	void Max(int a, int b, int* z)
 	{
 		*z = a > b? a : b;
 	}
 };
 
-TEST_F(thread_pool_test, Test1)
+TEST_F(ThreadPoolTest, Test1)
 {
 	std::vector<int> x(100, 8);
 	std::vector<int> y(100, 9);
@@ -38,7 +36,7 @@ TEST_F(thread_pool_test, Test1)
 	for (int i = 0; i < 100; ++i)
 	{
 		pool.push_task(
-				bind(&thread_pool_test::max, this, x[i], y[i], &z[i]));
+				boost::bind(&ThreadPoolTest::Max, this, x[i], y[i], &z[i]));
 	}
 	pool.Stop();
 	for (int i = 0; i < 100; ++i)