chat_server.cc 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. //
  2. // chat_server.cpp
  3. // ~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #include <algorithm>
  11. #include <cstdlib>
  12. #include <deque>
  13. #include <iostream>
  14. #include <list>
  15. #include <set>
  16. #include <boost/bind.hpp>
  17. #include <boost/shared_ptr.hpp>
  18. #include <boost/enable_shared_from_this.hpp>
  19. #include <boost/asio.hpp>
  20. #include "Experimental/chat/chat_message.h"
  21. using boost::asio::ip::tcp;
  22. //----------------------------------------------------------------------
  23. typedef std::deque<chat_message> chat_message_queue;
  24. //----------------------------------------------------------------------
  25. class chat_participant
  26. {
  27. public:
  28. virtual ~chat_participant()
  29. {
  30. }
  31. virtual void deliver(const chat_message& msg) = 0;
  32. };
  33. typedef boost::shared_ptr<chat_participant> chat_participant_ptr;
  34. //----------------------------------------------------------------------
  35. class chat_room
  36. {
  37. public:
  38. void join(chat_participant_ptr participant)
  39. {
  40. participants_.insert(participant);
  41. std::for_each(recent_msgs_.begin(), recent_msgs_.end(), boost::bind(
  42. &chat_participant::deliver, participant, _1));
  43. }
  44. void leave(chat_participant_ptr participant)
  45. {
  46. participants_.erase(participant);
  47. }
  48. void deliver(const chat_message& msg)
  49. {
  50. recent_msgs_.push_back(msg);
  51. while (recent_msgs_.size() > max_recent_msgs)
  52. recent_msgs_.pop_front();
  53. std::for_each(participants_.begin(), participants_.end(), boost::bind(
  54. &chat_participant::deliver, _1, boost::ref(msg)));
  55. }
  56. private:
  57. std::set<chat_participant_ptr> participants_;
  58. enum
  59. {
  60. max_recent_msgs = 100
  61. };
  62. chat_message_queue recent_msgs_;
  63. };
  64. //----------------------------------------------------------------------
  65. class chat_session: public chat_participant,
  66. public boost::enable_shared_from_this<chat_session>
  67. {
  68. public:
  69. chat_session(boost::asio::io_service& io_service, chat_room& room) :
  70. socket_(io_service), room_(room)
  71. {
  72. }
  73. tcp::socket& socket()
  74. {
  75. return socket_;
  76. }
  77. void start()
  78. {
  79. room_.join(shared_from_this());
  80. boost::asio::async_read(socket_, boost::asio::buffer(read_msg_.data(),
  81. chat_message::header_length), boost::bind(
  82. &chat_session::handle_read_header, shared_from_this(),
  83. boost::asio::placeholders::error));
  84. }
  85. void deliver(const chat_message& msg)
  86. {
  87. bool write_in_progress = !write_msgs_.empty();
  88. write_msgs_.push_back(msg);
  89. if (!write_in_progress)
  90. {
  91. boost::asio::async_write(socket_, boost::asio::buffer(
  92. write_msgs_.front().data(), write_msgs_.front().length()),
  93. boost::bind(&chat_session::handle_write,
  94. shared_from_this(),
  95. boost::asio::placeholders::error));
  96. }
  97. }
  98. void handle_read_header(const boost::system::error_code& error)
  99. {
  100. if (!error && read_msg_.decode_header())
  101. {
  102. boost::asio::async_read(socket_, boost::asio::buffer(
  103. read_msg_.body(), read_msg_.body_length()), boost::bind(
  104. &chat_session::handle_read_body, shared_from_this(),
  105. boost::asio::placeholders::error));
  106. }
  107. else
  108. {
  109. room_.leave(shared_from_this());
  110. }
  111. }
  112. void handle_read_body(const boost::system::error_code& error)
  113. {
  114. if (!error)
  115. {
  116. room_.deliver(read_msg_);
  117. boost::asio::async_read(socket_, boost::asio::buffer(
  118. read_msg_.data(), chat_message::header_length),
  119. boost::bind(&chat_session::handle_read_header,
  120. shared_from_this(),
  121. boost::asio::placeholders::error));
  122. }
  123. else
  124. {
  125. room_.leave(shared_from_this());
  126. }
  127. }
  128. void handle_write(const boost::system::error_code& error)
  129. {
  130. if (!error)
  131. {
  132. write_msgs_.pop_front();
  133. if (!write_msgs_.empty())
  134. {
  135. boost::asio::async_write(socket_, boost::asio::buffer(
  136. write_msgs_.front().data(),
  137. write_msgs_.front().length()), boost::bind(
  138. &chat_session::handle_write, shared_from_this(),
  139. boost::asio::placeholders::error));
  140. }
  141. }
  142. else
  143. {
  144. room_.leave(shared_from_this());
  145. }
  146. }
  147. private:
  148. tcp::socket socket_;
  149. chat_room& room_;
  150. chat_message read_msg_;
  151. chat_message_queue write_msgs_;
  152. };
  153. typedef boost::shared_ptr<chat_session> chat_session_ptr;
  154. //----------------------------------------------------------------------
  155. class chat_server
  156. {
  157. public:
  158. chat_server(boost::asio::io_service& io_service,
  159. const tcp::endpoint& endpoint) :
  160. io_service_(io_service), acceptor_(io_service, endpoint)
  161. {
  162. chat_session_ptr new_session(new chat_session(io_service_, room_));
  163. acceptor_.async_accept(new_session->socket(), boost::bind(
  164. &chat_server::handle_accept, this, new_session,
  165. boost::asio::placeholders::error));
  166. }
  167. void handle_accept(chat_session_ptr session,
  168. const boost::system::error_code& error)
  169. {
  170. if (!error)
  171. {
  172. session->start();
  173. chat_session_ptr new_session(new chat_session(io_service_, room_));
  174. acceptor_.async_accept(new_session->socket(), boost::bind(
  175. &chat_server::handle_accept, this, new_session,
  176. boost::asio::placeholders::error));
  177. }
  178. }
  179. private:
  180. boost::asio::io_service& io_service_;
  181. tcp::acceptor acceptor_;
  182. chat_room room_;
  183. };
  184. typedef boost::shared_ptr<chat_server> chat_server_ptr;
  185. typedef std::list<chat_server_ptr> chat_server_list;
  186. //----------------------------------------------------------------------
  187. int main(int argc, char* argv[])
  188. {
  189. try
  190. {
  191. if (argc < 2)
  192. {
  193. std::cerr << "Usage: chat_server <port> [<port> ...]\n";
  194. return 1;
  195. }
  196. boost::asio::io_service io_service;
  197. chat_server_list servers;
  198. for (int i = 1; i < argc; ++i)
  199. {
  200. using namespace std;
  201. // For atoi.
  202. tcp::endpoint endpoint(tcp::v4(), atoi(argv[i]));
  203. chat_server_ptr server(new chat_server(io_service, endpoint));
  204. servers.push_back(server);
  205. }
  206. io_service.run();
  207. }
  208. catch (std::exception& e)
  209. {
  210. std::cerr << "Exception: " << e.what() << "\n";
  211. }
  212. return 0;
  213. }