RpcClient.cc 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. #include <functional>
  2. #include <boost/asio.hpp>
  3. #include <google/protobuf/message.h>
  4. #include <google/protobuf/descriptor.h>
  5. #include <glog/logging.h>
  6. #include "Rpc/RpcCommunicator.h"
  7. #include "Rpc/RpcClient.h"
  8. #include "Rpc/RequestHandler.h"
  9. namespace Egametang {
  10. RpcClient::RpcClient(boost::asio::io_service& ioService, std::string host, int port):
  11. RpcCommunicator(ioService), id(0)
  12. {
  13. // another thread?
  14. boost::asio::ip::address address;
  15. address.from_string(host);
  16. boost::asio::ip::tcp::endpoint endpoint(address, port);
  17. socket.async_connect(endpoint,
  18. std::bind(&RpcClient::OnAsyncConnect, this,
  19. std::placeholders::_1));
  20. }
  21. RpcClient::~RpcClient()
  22. {
  23. }
  24. void RpcClient::OnAsyncConnect(const boost::system::error_code& err)
  25. {
  26. if (err)
  27. {
  28. return;
  29. }
  30. auto recvMeta = std::make_shared<RpcMeta>();
  31. auto recvMessage = std::make_shared<std::string>();
  32. RecvMeta(recvMeta, recvMessage);
  33. }
  34. void RpcClient::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
  35. {
  36. // 没有回调
  37. if (requestHandlers.find(meta->id) == requestHandlers.end())
  38. {
  39. // meta和message可以循环利用
  40. RecvMeta(meta, message);
  41. }
  42. else
  43. {
  44. auto requestHandler = requestHandlers[meta->id];
  45. requestHandlers.erase(meta->id);
  46. requestHandler->Response()->ParseFromString(*message);
  47. // meta和message可以循环利用
  48. RecvMeta(meta, message);
  49. // 回调放在函数最.如果RecvMeta()放在回调之后,
  50. // 另外线程可能让io_service stop,导致RecvMeta还未跑完
  51. // 网络就终止了
  52. requestHandler->Run();
  53. }
  54. }
  55. void RpcClient::OnSendMessage(RpcMetaPtr meta, StringPtr message)
  56. {
  57. }
  58. void RpcClient::CallMethod(
  59. const google::protobuf::MethodDescriptor* method,
  60. google::protobuf::RpcController* controller,
  61. const google::protobuf::Message* request,
  62. google::protobuf::Message* response,
  63. google::protobuf::Closure* done)
  64. {
  65. if (done)
  66. {
  67. auto request_handler = std::make_shared<RequestHandler>(response, done);
  68. requestHandlers[++id] = request_handler;
  69. }
  70. std::hash<std::string> stringHash;
  71. auto message = std::make_shared<std::string>();
  72. request->SerializePartialToString(message.get());
  73. auto meta = std::make_shared<RpcMeta>();
  74. meta->size = message->size();
  75. meta->id = id;
  76. meta->method = stringHash(method->full_name());
  77. SendMeta(meta, message);
  78. }
  79. } // namespace Egametang