RpcChannel.cc 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #include <boost/asio.hpp>
  2. #include <boost/make_shared.hpp>
  3. #include <google/protobuf/message.h>
  4. #include "Net/RpcChannel.h"
  5. #include "Net/RpcCommunicator.h"
  6. namespace Hainan {
  7. RpcChannel::RpcChannel(std::string& host, int port):
  8. id(0), communicator(host, port)
  9. {
  10. // another thread?
  11. RecvMessage();
  12. }
  13. void RpcChannel::RecvResponseHandler(StringPtr ss,
  14. const boost::asio::error_code& err)
  15. {
  16. if (err)
  17. {
  18. LOG(ERROR) << "receive response failed";
  19. return;
  20. }
  21. RpcResponse response;
  22. Response->ParseFromString(*ss);
  23. RpcHandlerPtr handler = handlers[response.id()];
  24. handler->GetResponse()->ParseFromString(response.response());
  25. handlers.erase(response.id());
  26. RecvMessage();
  27. }
  28. void RpcChannel::RecvSizeHandler(IntPtr size,
  29. const boost::asio::error_code& err)
  30. {
  31. if (err)
  32. {
  33. LOG(ERROR) << "receive response size failed";
  34. return;
  35. }
  36. StringPtr ss;
  37. boost::asio::async_read(socket,
  38. boost::asio::buffer(*ss, *size),
  39. boost::bind(&RpcChannel::RecvResponseHandler, this, ss,
  40. boost::asio::placeholders::error));
  41. }
  42. void RpcChannel::RecvMessage()
  43. {
  44. IntPtr size(new int);
  45. boost::asio::async_read(socket,
  46. boost::asio::buffer(
  47. reinterpret_cast<char*>(size.get()), sizeof(int)),
  48. boost::bind(&RpcChannel::RecvSizeHandler, this, size,
  49. boost::asio::placeholders::error));
  50. }
  51. void RpcChannel::SendRequestHandler(int32 id, RpcHandlerPtr handler,
  52. const boost::asio::error_code& err)
  53. {
  54. if (err)
  55. {
  56. LOG(ERROR) << "SendRequestHandler error:" << e.what();
  57. return;
  58. }
  59. handlers[id] = handler;
  60. }
  61. void RpcChannel::SendSizeHandler(const RpcRequestPtr request,
  62. RpcHandlerPtr handler, const boost::asio::error_code& err)
  63. {
  64. if (err)
  65. {
  66. LOG(ERROR) << "SendSizeHandler error:" << e.what();
  67. return;
  68. }
  69. std::string ss = request->SerializeAsString();
  70. boost::asio::async_write(socket, boost::asio::buffer(ss),
  71. boost::bind(&RpcChannel::SendRequestHandler, this, request->id(),
  72. handler, boost::asio::placeholders::error));
  73. }
  74. void RpcChannel::SendMessage(const RpcRequestPtr request, RpcHandlerPtr handler)
  75. {
  76. int size = request->ByteSize();
  77. std::string ss = boost::lexical_cast(size);
  78. boost::asio::async_write(socket, boost::asio::buffer(ss),
  79. boost::bind(&RpcChannel::SendSizeHandler, this, request,
  80. handler, boost::asio::placeholders::error));
  81. }
  82. void RpcChannel::CallMethod(
  83. const google::protobuf::MethodDescriptor* method,
  84. google::protobuf::RpcController* controller,
  85. const google::protobuf::Message* request,
  86. google::protobuf::Message* response,
  87. google::protobuf::Closure* done)
  88. {
  89. RpcRequestPtr req(new RpcRequest);
  90. req->set_id(++id);
  91. req->set_method(method->full_name());
  92. req->set_request(request->SerializeAsString());
  93. RpcHandlerPtr handler(new RpcHandler(controller, response, done));
  94. SendMessage(req, handler);
  95. }
  96. } // namespace Hainan