RpcClient.cc 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #include <boost/asio.hpp>
  2. #include <boost/bind.hpp>
  3. #include <boost/make_shared.hpp>
  4. #include <google/protobuf/message.h>
  5. #include <google/protobuf/descriptor.h>
  6. #include <glog/logging.h>
  7. #include "Rpc/RpcCommunicator.h"
  8. #include "Rpc/RpcClient.h"
  9. #include "Rpc/RequestHandler.h"
  10. namespace Egametang {
  11. RpcClient::RpcClient(boost::asio::io_service& ioService, std::string host, int port):
  12. RpcCommunicator(ioService), id(0)
  13. {
  14. // another thread?
  15. boost::asio::ip::address address;
  16. address.from_string(host);
  17. boost::asio::ip::tcp::endpoint endpoint(address, port);
  18. socket.async_connect(endpoint,
  19. boost::bind(&RpcClient::OnAsyncConnect, this,
  20. boost::asio::placeholders::error));
  21. }
  22. RpcClient::~RpcClient()
  23. {
  24. }
  25. void RpcClient::OnAsyncConnect(const boost::system::error_code& err)
  26. {
  27. if (err)
  28. {
  29. return;
  30. }
  31. RpcMetaPtr recvMeta = boost::make_shared<RpcMeta>();
  32. StringPtr recvMessage = boost::make_shared<std::string>();
  33. RecvMeta(recvMeta, recvMessage);
  34. }
  35. void RpcClient::OnRecvMessage(RpcMetaPtr meta, StringPtr message)
  36. {
  37. // 没有回调
  38. if (requestHandlers.find(meta->id) == requestHandlers.end())
  39. {
  40. // meta和message可以循环利用
  41. RecvMeta(meta, message);
  42. }
  43. else
  44. {
  45. RequestHandlerPtr requestHandler = requestHandlers[meta->id];
  46. requestHandlers.erase(meta->id);
  47. requestHandler->Response()->ParseFromString(*message);
  48. // meta和message可以循环利用
  49. RecvMeta(meta, message);
  50. // 回调放在函数最.如果RecvMeta()放在回调之后,
  51. // 另外线程可能让io_service stop,导致RecvMeta还未跑完
  52. // 网络就终止了
  53. requestHandler->Run();
  54. }
  55. }
  56. void RpcClient::OnSendMessage(RpcMetaPtr meta, StringPtr message)
  57. {
  58. }
  59. void RpcClient::CallMethod(
  60. const google::protobuf::MethodDescriptor* method,
  61. google::protobuf::RpcController* controller,
  62. const google::protobuf::Message* request,
  63. google::protobuf::Message* response,
  64. google::protobuf::Closure* done)
  65. {
  66. if (done)
  67. {
  68. RequestHandlerPtr request_handler = boost::make_shared<RequestHandler>(response, done);
  69. requestHandlers[++id] = request_handler;
  70. }
  71. boost::hash<std::string> stringHash;
  72. StringPtr message = boost::make_shared<std::string>();
  73. request->SerializePartialToString(message.get());
  74. RpcMetaPtr meta = boost::make_shared<RpcMeta>();
  75. meta->size = message->size();
  76. meta->id = id;
  77. meta->method = stringHash(method->full_name());
  78. SendMeta(meta, message);
  79. }
  80. } // namespace Egametang