RpcSession.cc 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. #include "Net/RpcSession.h"
  2. namespace Egametang {
  3. RpcSession::RpcSession(RpcServer& rpc_server): rpc_server_(rpc_server)
  4. {
  5. }
  6. boost::asio::ip::tcp::socket& RpcSession::Socket()
  7. {
  8. return socket_;
  9. }
  10. void RpcSession::SendMessageHandler(int32 id, RpcHandlerPtr handler,
  11. const boost::system::error_code& err)
  12. {
  13. if (err)
  14. {
  15. LOG(ERROR) << "SendMessage error:";
  16. return;
  17. }
  18. }
  19. void RpcSession::SendMessage(const RpcResponsePtr response, const boost::system::error_code& err)
  20. {
  21. if (err)
  22. {
  23. return;
  24. }
  25. std::string ss = response->SerializeAsString();
  26. boost::asio::async_write(socket_, boost::asio::buffer(ss),
  27. boost::bind(&RpcSession::SendMessageHandler, this,
  28. response->id(), boost::asio::placeholders::error));
  29. }
  30. void RpcSession::SendMessageSize(RpcResponsePtr response)
  31. {
  32. int size = response->ByteSize();
  33. std::string ss = boost::lexical_cast(size);
  34. boost::asio::async_write(socket_, boost::asio::buffer(ss),
  35. boost::bind(&RpcSession::SendMessage, this,
  36. response, boost::asio::placeholders::error));
  37. }
  38. ///////////////////////////
  39. void RpcSession::RecvMessegeSize()
  40. {
  41. IntPtr size(new int);
  42. boost::asio::async_read(socket_,
  43. boost::asio::buffer(
  44. reinterpret_cast<char*>(size.get()), sizeof(int)),
  45. boost::bind(&RpcSession::RecvMessage, this, size,
  46. boost::asio::placeholders::error));
  47. }
  48. void RpcSession::RecvMessage(IntPtr size, const boost::system::error_code& err)
  49. {
  50. if (err)
  51. {
  52. LOG(ERROR) << "receive request size failed";
  53. return;
  54. }
  55. StringPtr ss(new std::string);
  56. boost::asio::async_read(socket_,
  57. boost::asio::buffer(*ss, *size),
  58. boost::bind(&RpcSession::RecvMessageHandler, this, ss,
  59. boost::asio::placeholders::error));
  60. }
  61. void RpcSession::RecvMessageHandler(StringPtr ss, const boost::system::error_code& err)
  62. {
  63. if (err)
  64. {
  65. LOG(ERROR) << "receive request message failed";
  66. return;
  67. }
  68. RpcRequestPtr request(new RpcRequest);
  69. request->ParseFromString(*ss);
  70. RpcResponsePtr response(new RpcResponse);
  71. response->set_id(request->id_());
  72. rpc_server_.RunService(shared_from_this(), request,
  73. boost::bind(&RpcSession::SendMessegeSize, shared_from_this(), response));
  74. // read size
  75. RecvMessegeSize();
  76. }
  77. void RpcSession::Start()
  78. {
  79. RecvMessegeSize();
  80. }
  81. void RpcSession::Stop()
  82. {
  83. socket_.close();
  84. sessions_.erase(shared_from_this());
  85. }
  86. }