RpcServer.cc 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. #include <boost/asio.hpp>
  2. #include <boost/foreach.hpp>
  3. #include <google/protobuf/service.h>
  4. #include "Base/Base.h"
  5. #include "Net/RpcServer.h"
  6. #include "Net/RpcSession.h"
  7. #include "Thread/ThreadPool.h"
  8. namespace Egametang {
  9. RpcServer::RpcServer(boost::asio::io_service& io_service, int port, ThreadPool& thread_pool):
  10. io_service_(io_service), thread_pool_(thread_pool)
  11. {
  12. boost::asio::ip::address address;
  13. address.from_string("localhost");
  14. boost::asio::ip::tcp::endpoint endpoint(address, port);
  15. acceptor_.open(endpoint.protocol());
  16. acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
  17. acceptor_.bind(endpoint);
  18. acceptor_.listen();
  19. RpcSessionPtr new_session(new RpcSession(sessions_));
  20. acceptor_.async_accept(new_session->socket(),
  21. boost::bind(&RpcServer::HandleAsyncAccept, this,
  22. boost::asio::placeholders::error));
  23. }
  24. void RpcServer::HandleAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
  25. {
  26. if (err)
  27. {
  28. return;
  29. }
  30. session->Start();
  31. sessions_.insert(session);
  32. RpcSessionPtr new_session(new RpcSession(*this));
  33. acceptor_.async_accept(new_session->socket(),
  34. boost::bind(&RpcServer::HandleAsyncAccept, this,
  35. boost::asio::placeholders::error));
  36. }
  37. void RpcServer::Callback(RpcSessionPtr session,
  38. boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
  39. {
  40. session->socket.get_io_service().post(handler);
  41. }
  42. void RpcServer::Stop()
  43. {
  44. acceptor_.close();
  45. foreach(RpcSessionPtr session, sessions_)
  46. {
  47. session->Stop();
  48. }
  49. sessions_.clear();
  50. }
  51. void RpcServer::RunService(RpcSessionPtr session, RpcRequestPtr request,
  52. boost::function<void (RpcSessionPtr, RpcResponsePtr)> handler)
  53. {
  54. google::protobuf::Closure* done = google::protobuf::NewCallback(
  55. this, &RpcServer::Callback, session, handler);
  56. thread_pool_.PushTask(
  57. boost::bind(&google::protobuf::Service::CallMethod, &service_,
  58. method, NULL, request.get(), done));
  59. }
  60. void RpcServer::RegisterService(ProtobufServicePtr service)
  61. {
  62. }
  63. void RpcServer::Start()
  64. {
  65. io_service_.run();
  66. }
  67. } // namespace Egametang