RpcChannel.cc 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. #include <boost/asio.hpp>
  2. #include <boost/bind.hpp>
  3. #include <glog/logging.h>
  4. #include <google/protobuf/message.h>
  5. #include <google/protobuf/descriptor.h>
  6. #include "Rpc/RpcCommunicator.h"
  7. #include "Rpc/RpcChannel.h"
  8. #include "Rpc/RequestHandler.h"
  9. namespace Egametang {
  10. RpcChannel::RpcChannel(boost::asio::io_service& io_service, std::string host, int port):
  11. RpcCommunicator(io_service), id(0)
  12. {
  13. // another thread?
  14. boost::asio::ip::address address;
  15. address.from_string(host);
  16. boost::asio::ip::tcp::endpoint endpoint(address, port);
  17. socket.async_connect(endpoint,
  18. boost::bind(&RpcChannel::OnAsyncConnect, this,
  19. boost::asio::placeholders::error));
  20. }
  21. RpcChannel::~RpcChannel()
  22. {
  23. }
  24. void RpcChannel::OnAsyncConnect(const boost::system::error_code& err)
  25. {
  26. if (err)
  27. {
  28. LOG(ERROR) << "async connect failed: " << err.message();
  29. return;
  30. }
  31. RpcMetaPtr recv_meta(new RpcMeta());
  32. StringPtr recv_message(new std::string);
  33. RecvMeta(recv_meta, recv_message);
  34. }
  35. void RpcChannel::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
  36. {
  37. RequestHandlerPtr request_handler = request_handlers[meta->id];
  38. request_handlers.erase(meta->id);
  39. request_handler->Response()->ParseFromString(*message);
  40. // meta和message可以循环利用
  41. RecvMeta(meta, message);
  42. // 回调放在函数最后.如果RecvMeta()放在回调之后,
  43. // 另外线程可能让io_service stop,导致RecvMeta还未跑完
  44. // 网络就终止了
  45. request_handler->Run();
  46. }
  47. void RpcChannel::OnSendMessage(RpcMetaPtr meta, StringPtr message)
  48. {
  49. }
  50. void RpcChannel::HandleStop()
  51. {
  52. socket.close();
  53. }
  54. void RpcChannel::Stop()
  55. {
  56. // 把socket.close()调度到io_service线程,
  57. // 防止两个线程同时操作socket
  58. io_service.post(boost::bind(&RpcChannel::HandleStop, shared_from_this()));
  59. }
  60. void RpcChannel::CallMethod(
  61. const google::protobuf::MethodDescriptor* method,
  62. google::protobuf::RpcController* controller,
  63. const google::protobuf::Message* request,
  64. google::protobuf::Message* response,
  65. google::protobuf::Closure* done)
  66. {
  67. RequestHandlerPtr request_handler(new RequestHandler(response, done));
  68. request_handlers[++id] = request_handler;
  69. boost::hash<std::string> string_hash;
  70. StringPtr message(new std::string);
  71. request->SerializePartialToString(message.get());
  72. RpcMetaPtr meta(new RpcMeta());
  73. meta->size = message->size();
  74. meta->id = id;
  75. meta->method = string_hash(method->full_name());
  76. SendMeta(meta, message);
  77. }
  78. } // namespace Egametang