RpcChannel.cc 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. #include <boost/asio.hpp>
  2. #include <boost/make_shared.hpp>
  3. #include <google/protobuf/message.h>
  4. #include "Net/RpcChannel.h"
  5. namespace Hainan {
  6. RpcChannel::RpcChannel(
  7. boost::asio::io_service& service, std::string& host, int port):
  8. io_service(service)
  9. {
  10. // another thread?
  11. boost::asio::ip::address address;
  12. address.from_string(host);
  13. boost::asio::ip::tcp::endpoint endpoint(address, port);
  14. socket.async_connect(endpoint,
  15. boost::bind(&RpcChannel::AsyncConnectHandler, this,
  16. boost::asio::placeholders::error));
  17. }
  18. void RpcChannel::AsyncConnectHandler(const boost::system::error_code& err)
  19. {
  20. if (err)
  21. {
  22. LOG(ERROR) << "async connect failed";
  23. return;
  24. }
  25. RecvMessegeSize();
  26. }
  27. void RpcChannel::RecvMessegeSize()
  28. {
  29. IntPtr size(new int);
  30. boost::asio::async_read(socket,
  31. boost::asio::buffer(
  32. reinterpret_cast<char*>(size.get()), sizeof(int)),
  33. boost::bind(&RpcChannel::RecvMessage, this, size,
  34. boost::asio::placeholders::error));
  35. }
  36. void RpcChannel::RecvMessage(IntPtr size, const boost::system::error_code& err)
  37. {
  38. if (err)
  39. {
  40. LOG(ERROR) << "receive response size failed";
  41. return;
  42. }
  43. StringPtr ss;
  44. boost::asio::async_read(socket,
  45. boost::asio::buffer(*ss, *size),
  46. boost::bind(&RpcChannel::RecvMessageHandler, this, ss,
  47. boost::asio::placeholders::error));
  48. }
  49. void RpcChannel::RecvMessageHandler(
  50. StringPtr ss, const boost::system::error_code& err)
  51. {
  52. if (err)
  53. {
  54. LOG(ERROR) << "receive response failed";
  55. return;
  56. }
  57. RpcResponse response;
  58. Response->ParseFromString(*ss);
  59. RpcHandlerPtr handler = handlers[response.id()];
  60. handler->GetResponse()->ParseFromString(response.response());
  61. handlers.erase(response.id());
  62. // read size
  63. RecvMessegeSize();
  64. }
  65. void RpcChannel::SendMessageHandler(int32 id, RpcHandlerPtr handler,
  66. const boost::system::error_code& err)
  67. {
  68. if (err)
  69. {
  70. LOG(ERROR) << "SendMessage error:";
  71. return;
  72. }
  73. handlers[id] = handler;
  74. }
  75. void RpcChannel::SendMessage(const RpcRequestPtr request,
  76. RpcHandlerPtr handler, const boost::system::error_code& err)
  77. {
  78. if (err)
  79. {
  80. LOG(ERROR) << "SendRequestSize error:";
  81. return;
  82. }
  83. std::string ss = request->SerializeAsString();
  84. boost::asio::async_write(socket, boost::asio::buffer(ss),
  85. boost::bind(&RpcChannel::SendMessageHandler, this, request->id(),
  86. handler, boost::asio::placeholders::error));
  87. }
  88. void RpcChannel::SendMessageSize(
  89. const RpcRequestPtr request, RpcHandlerPtr handler)
  90. {
  91. int size = request->ByteSize();
  92. std::string ss = boost::lexical_cast(size);
  93. boost::asio::async_write(socket, boost::asio::buffer(ss),
  94. boost::bind(&RpcChannel::SendMessage, this, request,
  95. handler, boost::asio::placeholders::error));
  96. }
  97. void RpcChannel::CallMethod(
  98. const google::protobuf::MethodDescriptor* method,
  99. google::protobuf::RpcController* controller,
  100. const google::protobuf::Message* request,
  101. google::protobuf::Message* response,
  102. google::protobuf::Closure* done)
  103. {
  104. RpcRequestPtr req(new RpcRequest);
  105. req->set_id(++id);
  106. req->set_method(method->full_name());
  107. req->set_request(request->SerializeAsString());
  108. RpcHandlerPtr handler(new RpcHandler(controller, response, done));
  109. SendMessageSize(req, handler);
  110. }
  111. } // namespace Hainan