RpcChannel.cc 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #include <boost/asio.hpp>
  2. #include <boost/make_shared.hpp>
  3. #include <google/protobuf/message.h>
  4. #include "Net/RpcChannel.h"
  5. namespace Hainan {
  6. RpcChannel::RpcChannel(std::string& host, int port):
  7. id(0), io_service()
  8. {
  9. // another thread?
  10. boost::asio::ip::address address;
  11. address.from_string(host);
  12. boost::asio::ip::tcp::endpoint endpoint(address, port);
  13. socket.async_connect(endpoint,
  14. boost::bind(&RpcChannel::AsyncConnectHandler, this,
  15. boost::asio::placeholders::error));
  16. }
  17. void RpcChannel::AsyncConnectHandler(const boost::system::error_code& err) {
  18. if (err)
  19. {
  20. LOG(ERROR) << "async connect failed";
  21. return;
  22. }
  23. RecvRequestSize();
  24. }
  25. void RpcChannel::RecvRequestSize()
  26. {
  27. IntPtr size(new int);
  28. boost::asio::async_read(socket,
  29. boost::asio::buffer(
  30. reinterpret_cast<char*>(size.get()), sizeof(int)),
  31. boost::bind(&RpcChannel::RecvMessage, this, size,
  32. boost::asio::placeholders::error));
  33. }
  34. void RpcChannel::RecvMessage(IntPtr size, const boost::system::error_code& err)
  35. {
  36. if (err)
  37. {
  38. LOG(ERROR) << "receive response size failed";
  39. return;
  40. }
  41. StringPtr ss;
  42. boost::asio::async_read(socket,
  43. boost::asio::buffer(*ss, *size),
  44. boost::bind(&RpcChannel::RecvMessageHandler, this, ss,
  45. boost::asio::placeholders::error));
  46. }
  47. void RpcChannel::RecvMessageHandler(StringPtr ss,
  48. const boost::system::error_code& err)
  49. {
  50. if (err)
  51. {
  52. LOG(ERROR) << "receive response failed";
  53. return;
  54. }
  55. RpcResponse response;
  56. Response->ParseFromString(*ss);
  57. RpcHandlerPtr handler = handlers[response.id()];
  58. handler->GetResponse()->ParseFromString(response.response());
  59. handlers.erase(response.id());
  60. // read size
  61. RecvRequestSize();
  62. }
  63. void RpcChannel::SendMessageHandler(int32 id, RpcHandlerPtr handler,
  64. const boost::system::error_code& err)
  65. {
  66. if (err)
  67. {
  68. LOG(ERROR) << "SendMessage error:";
  69. return;
  70. }
  71. handlers[id] = handler;
  72. }
  73. void RpcChannel::SendMessage(const RpcRequestPtr request,
  74. RpcHandlerPtr handler, const boost::system::error_code& err)
  75. {
  76. if (err)
  77. {
  78. LOG(ERROR) << "SendRequestSize error:";
  79. return;
  80. }
  81. std::string ss = request->SerializeAsString();
  82. boost::asio::async_write(socket, boost::asio::buffer(ss),
  83. boost::bind(&RpcChannel::SendMessageHandler, this, request->id(),
  84. handler, boost::asio::placeholders::error));
  85. }
  86. void RpcChannel::SendRequestSize(const RpcRequestPtr request,
  87. RpcHandlerPtr handler)
  88. {
  89. int size = request->ByteSize();
  90. std::string ss = boost::lexical_cast(size);
  91. boost::asio::async_write(socket, boost::asio::buffer(ss),
  92. boost::bind(&RpcChannel::SendMessage, this, request,
  93. handler, boost::asio::placeholders::error));
  94. }
  95. void RpcChannel::CallMethod(
  96. const google::protobuf::MethodDescriptor* method,
  97. google::protobuf::RpcController* controller,
  98. const google::protobuf::Message* request,
  99. google::protobuf::Message* response,
  100. google::protobuf::Closure* done)
  101. {
  102. RpcRequestPtr req(new RpcRequest);
  103. req->set_id(++id);
  104. req->set_method(method->full_name());
  105. req->set_request(request->SerializeAsString());
  106. RpcHandlerPtr handler(new RpcHandler(controller, response, done));
  107. SendRequestSize(req, handler);
  108. }
  109. } // namespace Hainan