RpcServer.cc 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. #include <boost/asio.hpp>
  2. #include <boost/foreach.hpp>
  3. #include <google/protobuf/service.h>
  4. #include "Base/Base.h"
  5. #include "Net/RpcServer.h"
  6. #include "Net/RpcSession.h"
  7. #include "Thread/ThreadPool.h"
  8. namespace Hainan {
  9. RpcServer::RpcServer(google::protobuf::Service& pservice, int port):
  10. service(pservice), io_service(), thread_pool()
  11. {
  12. boost::asio::ip::address address;
  13. address.from_string("localhost");
  14. boost::asio::ip::tcp::endpoint endpoint(address, port);
  15. acceptor.open(endpoint.protocol());
  16. acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
  17. acceptor.bind(endpoint);
  18. acceptor.listen();
  19. RpcSessionPtr new_session(new RpcSession(sessions));
  20. acceptor.async_accept(new_session->socket(),
  21. boost::bind(&RpcServer::HandleAsyncAccept, this,
  22. boost::asio::placeholders::error));
  23. }
  24. void RpcServer::HandleAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
  25. {
  26. if (err)
  27. {
  28. return;
  29. }
  30. session->Start();
  31. sessions.insert(session);
  32. RpcSessionPtr new_session(new RpcSession(io_service, sessions));
  33. acceptor.async_accept(new_session->socket(),
  34. boost::bind(&RpcServer::HandleAsyncAccept, this,
  35. boost::asio::placeholders::error));
  36. }
  37. void RpcServer::Callback(RpcSessionPtr session,
  38. boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
  39. {
  40. session->socket.get_io_service().post(handler);
  41. }
  42. void RpcServer::RunService(RpcSessionPtr session, RpcRequestPtr request,
  43. boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
  44. {
  45. google::protobuf::Closure* done = google::protobuf::NewCallback(
  46. &RpcServer::Callback, shared_from_this(), session, handler);
  47. thread_pool.PushTask(
  48. boost::bind(&google::protobuf::Service::CallMethod, &service,
  49. method, NULL, request.get(), done));
  50. }
  51. void RpcServer::Start()
  52. {
  53. io_service.run();
  54. }
  55. void RpcServer::Stop()
  56. {
  57. acceptor.close();
  58. foreach(RpcSessionPtr session, rpc_server->sessions)
  59. {
  60. session->stop();
  61. }
  62. sessions.clear();
  63. }
  64. } // namespace Hainan