| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- #include <boost/asio.hpp>
- #include <boost/foreach.hpp>
- #include <boost/bind.hpp>
- #include <google/protobuf/service.h>
- #include <google/protobuf/descriptor.h>
- #include <glog/logging.h>
- #include "Base/Marcos.h"
- #include "Rpc/RpcTypedef.h"
- #include "Rpc/RpcServer.h"
- #include "Rpc/RpcSession.h"
- #include "Rpc/ResponseHandler.h"
- #include "Rpc/MethodInfo.h"
- namespace Egametang {
- RpcServer::RpcServer(boost::asio::io_service& service, int port):
- io_service(service), acceptor(io_service),
- thread_pool(), sessions(),
- methods()
- {
- boost::asio::ip::address address;
- address.from_string("127.0.0.1");
- boost::asio::ip::tcp::endpoint endpoint(address, port);
- acceptor.open(endpoint.protocol());
- acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
- acceptor.bind(endpoint);
- acceptor.listen();
- RpcSessionPtr new_session(new RpcSession(io_service, *this));
- acceptor.async_accept(new_session->Socket(),
- boost::bind(&RpcServer::OnAsyncAccept, this,
- new_session, boost::asio::placeholders::error));
- }
- RpcServer::~RpcServer()
- {
- }
- void RpcServer::OnAsyncAccept(RpcSessionPtr session, const boost::system::error_code& err)
- {
- if (err)
- {
- LOG(ERROR) << "accept fail: " << err.message();
- return;
- }
- session->Start();
- sessions.insert(session);
- RpcSessionPtr new_session(new RpcSession(io_service, *this));
- acceptor.async_accept(new_session->Socket(),
- boost::bind(&RpcServer::OnAsyncAccept, this,
- new_session, boost::asio::placeholders::error));
- }
- void RpcServer::OnCallMethod(RpcSessionPtr session, ResponseHandlerPtr response_handler)
- {
- // 调度到网络线程
- session->Socket().get_io_service().post(
- boost::bind(&ResponseHandler::Run, response_handler));
- }
- void RpcServer::HandleStop()
- {
- acceptor.close();
- sessions.clear();
- }
- void RpcServer::Stop()
- {
- thread_pool.Wait();
- // 调度到io_service线程,防止两个线程竞争
- io_service.post(boost::bind(&RpcServer::HandleStop, shared_from_this()));
- }
- void RpcServer::RunService(RpcSessionPtr session, RpcMetaPtr meta,
- StringPtr message, MessageHandler message_handler)
- {
- MethodInfoPtr method_info = methods[meta->method];
- ResponseHandlerPtr response_handler(
- new ResponseHandler(method_info, meta->id, message_handler));
- response_handler->Request()->ParseFromString(*message);
- google::protobuf::Closure* done = google::protobuf::NewCallback(
- this, &RpcServer::OnCallMethod,
- session, response_handler);
- thread_pool.Schedule(
- boost::bind(&google::protobuf::Service::CallMethod, method_info->service,
- response_handler->Method(), (google::protobuf::RpcController*)(NULL),
- response_handler->Request(), response_handler->Response(),
- done));
- }
- void RpcServer::Register(RpcServicePtr service)
- {
- boost::hash<std::string> string_hash;
- const google::protobuf::ServiceDescriptor* service_descriptor = service->GetDescriptor();
- for (int i = 0; i < service_descriptor->method_count(); ++i)
- {
- const google::protobuf::MethodDescriptor* method_descriptor =
- service_descriptor->method(i);
- std::size_t method_hash = string_hash(method_descriptor->full_name());
- MethodInfoPtr method_info(new MethodInfo(service, method_descriptor));
- CHECK(methods.find(method_hash) == methods.end());
- methods[method_hash] = method_info;
- }
- }
- void RpcServer::Remove(RpcSessionPtr& session)
- {
- sessions.erase(session);
- }
- } // namespace Egametang
|