00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <boost/thread.hpp>
00034 #include <boost/bind.hpp>
00035
00036 #include <cstdlib>
00037 #include <iostream>
00038 #include <algorithm>
00039
00040 #include "asio_server.hpp"
00041
00042 using namespace ana;
00043
00044 using boost::asio::ip::tcp;
00045
00046 asio_server::asio_server() :
00047 io_service_(),
00048 work_( io_service_ ),
00049 io_threads_(),
00050 acceptor_( NULL ),
00051 client_proxies_(),
00052 listening_(false),
00053 listener_( NULL ),
00054 connection_handler_( NULL ),
00055 last_client_proxy_( NULL ),
00056 stats_collector_( ),
00057 last_valid_operation_id_( ana::no_operation )
00058 {
00059 }
00060
00061 asio_server::~asio_server()
00062 {
00063 io_service_.stop();
00064
00065 std::list< boost::thread* >::iterator thread_it;
00066
00067 thread_it = io_threads_.begin();
00068
00069 while (thread_it != io_threads_.end())
00070 {
00071 (*thread_it)->join();
00072 thread_it = io_threads_.erase( thread_it );
00073 }
00074
00075
00076
00077 std::list<client_proxy*> copy( client_proxies_ );
00078
00079 for (std::list<client_proxy*>::iterator proxy_it = copy.begin();
00080 proxy_it != copy.end();
00081 ++proxy_it)
00082 {
00083 delete *proxy_it;
00084 }
00085
00086 assert( client_proxies_.empty() );
00087
00088 delete last_client_proxy_;
00089 }
00090
00091 server* ana::server::create()
00092 {
00093 return new asio_server();
00094 }
00095
00096 void asio_server::set_connection_handler( connection_handler* handler )
00097 {
00098 connection_handler_ = handler;
00099 }
00100
00101 void asio_server::run(port pt)
00102 {
00103 tcp::acceptor* new_acceptor(new tcp::acceptor( io_service_,
00104 tcp::endpoint(tcp::v4(), atoi( pt.c_str() ))));
00105
00106 acceptor_.reset(new_acceptor);
00107
00108 async_accept( connection_handler_ );
00109
00110 run_listener( );
00111
00112 io_threads_.push_back(
00113 new boost::thread( boost::bind( &boost::asio::io_service::run, &io_service_) ) );
00114 }
00115
00116 void asio_server::async_accept( connection_handler* handler )
00117 {
00118 try
00119 {
00120 last_client_proxy_ = new asio_client_proxy(io_service_, this);
00121
00122 acceptor_->async_accept(last_client_proxy_->socket(),
00123 boost::bind(&asio_server::handle_accept,
00124 this,
00125 boost::asio::placeholders::error,
00126 last_client_proxy_,
00127 handler));
00128 }
00129 catch( const std::exception&)
00130 {
00131 delete last_client_proxy_;
00132 }
00133 }
00134
00135 void asio_server::register_client(client_proxy* client)
00136 {
00137 client_proxies_.push_back(client);
00138
00139 if (listening_)
00140 {
00141 client->set_listener_handler( listener_ );
00142 client->run_listener( );
00143 }
00144 }
00145
00146 void asio_server::deregister_client(client_proxy* client)
00147 {
00148 client_proxies_.remove( client );
00149 }
00150
00151 void asio_server::handle_accept(const boost::system::error_code& ec,
00152 asio_client_proxy* client,
00153 connection_handler* handler )
00154 {
00155 if (! ec)
00156 {
00157 if ( raw_mode() )
00158 client->set_raw_data_mode();
00159
00160 register_client(client);
00161 handler->handle_connect( ec, client->id() );
00162 }
00163 else
00164 {
00165 std::cerr << "Server: Error accepting client connection." << std::endl;
00166 delete client;
00167 }
00168
00169 async_accept( handler );
00170 }
00171
00172
00173 void asio_server::run_listener( )
00174 {
00175 for (std::list<client_proxy*>::iterator it( client_proxies_.begin() );
00176 it != client_proxies_.end();
00177 ++it)
00178 {
00179 (*it)->set_listener_handler( listener_ );
00180 (*it)->run_listener( );
00181 }
00182 }
00183
00184 void asio_server::set_listener_handler( listener_handler* listener )
00185 {
00186 listening_ = true;
00187 listener_ = listener;
00188
00189 for (std::list<client_proxy*>::iterator it( client_proxies_.begin() );
00190 it != client_proxies_.end();
00191 ++it)
00192 {
00193 (*it)->set_listener_handler( listener_ );
00194 }
00195 }
00196
00197 ana::operation_id asio_server::send_one(net_id id,
00198 boost::asio::const_buffer buffer,
00199 send_handler* handler,
00200 send_type copy_buffer)
00201 {
00202 return send_if(buffer,
00203 handler,
00204 create_predicate( boost::bind( std::equal_to<net_id>(), id, _1) ),
00205 copy_buffer );
00206 }
00207
00208 ana::operation_id asio_server::send_all_except(net_id id,
00209 boost::asio::const_buffer buffer,
00210 send_handler* handler,
00211 send_type copy_buffer)
00212 {
00213 return send_if(buffer,
00214 handler,
00215 create_predicate ( boost::bind( std::not_equal_to<net_id>(), id, _1) ),
00216 copy_buffer );
00217 }
00218
00219
00220 ana::operation_id asio_server::send_if(boost::asio::const_buffer buffer,
00221 send_handler* handler,
00222 const client_predicate& predicate,
00223 send_type copy_buffer)
00224 {
00225
00226 ana::detail::shared_buffer s_buf( new ana::detail::copying_buffer( buffer, copy_buffer ) );
00227
00228
00229 bool at_least_one_holds_the_condition( false );
00230
00231 ana::operation_id this_operation_id = ana::no_operation;
00232
00233 for (std::list<client_proxy*>::iterator it(client_proxies_.begin());
00234 it != client_proxies_.end();
00235 ++it)
00236 {
00237 if ( predicate.selects( (*it)->id() ) )
00238 {
00239 if ( ! at_least_one_holds_the_condition )
00240 this_operation_id = ++last_valid_operation_id_;
00241
00242 (*it)->send(s_buf, handler, this, this_operation_id);
00243 at_least_one_holds_the_condition = true;
00244 }
00245 }
00246
00247 return this_operation_id;
00248 }
00249
00250 ana::operation_id asio_server::send_all(boost::asio::const_buffer buffer,
00251 send_handler* handler,
00252 send_type copy_buffer )
00253 {
00254
00255 ana::detail::shared_buffer s_buf(new ana::detail::copying_buffer( buffer, copy_buffer ) );
00256
00257
00258
00259 if ( client_proxies_.empty() )
00260 return ana::no_operation;
00261
00262 ++last_valid_operation_id_;
00263
00264 std::for_each(client_proxies_.begin(),
00265 client_proxies_.end(),
00266 boost::bind(&client_proxy::send, _1,
00267 s_buf, handler, this, last_valid_operation_id_));
00268
00269 return last_valid_operation_id_;
00270 }
00271
00272 std::string asio_server::ip_address( net_id id ) const
00273 {
00274 std::list<ana::server::client_proxy*>::const_iterator it;
00275
00276 it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
00277 boost::bind( &client_proxy::id, _1) == id );
00278
00279 if ( it != client_proxies_.end() )
00280 return (*it)->ip_address();
00281 else
00282 return "";
00283 }
00284
00285 const ana::stats* asio_server::get_client_stats( ana::net_id id, ana::stat_type type ) const
00286 {
00287 std::list<ana::server::client_proxy*>::const_iterator it;
00288
00289 it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
00290 boost::bind( &client_proxy::id, _1) == id );
00291
00292 if ( it != client_proxies_.end() )
00293 return (*it)->get_stats(type);
00294 else
00295 return NULL;
00296 }
00297
00298 void asio_server::log_receive( ana::read_buffer buffer )
00299 {
00300 stats_collector_.log_receive( buffer );
00301 }
00302
00303 ana::timer* asio_server::create_timer()
00304 {
00305 return new ana::timer( io_service_ );
00306 }
00307
00308 const ana::stats* asio_server::get_stats( ana::stat_type type ) const
00309 {
00310 return stats_collector_.get_stats( type );
00311 }
00312
00313
00314 asio_server::asio_client_proxy::asio_client_proxy(boost::asio::io_service& io_service,
00315 asio_proxy_manager* server) :
00316 client_proxy(),
00317 asio_listener(),
00318 socket_(io_service),
00319 manager_(server),
00320 stats_collector_( )
00321 {
00322 }
00323
00324 asio_server::asio_client_proxy::~asio_client_proxy()
00325 {
00326 manager_->deregister_client( this );
00327 socket_.close();
00328 }
00329
00330 tcp::socket& asio_server::asio_client_proxy::socket()
00331 {
00332 return socket_;
00333 }
00334
00335 void asio_server::disconnect( ana::net_id id )
00336 {
00337 std::list<ana::server::client_proxy*>::iterator it;
00338
00339 it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
00340 boost::bind( &client_proxy::id, _1) == id );
00341
00342 if ( it != client_proxies_.end() )
00343 delete *it;
00344 }
00345
00346 void asio_server::disconnect()
00347 {
00348 io_service_.stop();
00349
00350 std::list< boost::thread* >::iterator it;
00351
00352 it = io_threads_.begin();
00353
00354 while (it != io_threads_.end())
00355 {
00356 (*it)->join();
00357 it = io_threads_.erase( it );
00358 }
00359
00360 for (std::list<client_proxy*>::iterator it = client_proxies_.begin();
00361 it != client_proxies_.end();
00362 ++it)
00363 {
00364 delete *it;
00365 }
00366
00367 client_proxies_.clear();
00368
00369 io_service_.reset();
00370 }
00371
00372 void asio_server::set_raw_buffer_max_size( size_t size)
00373 {
00374 for (std::list<client_proxy*>::iterator it = client_proxies_.begin();
00375 it != client_proxies_.end();
00376 ++it)
00377 {
00378 (*it)->set_raw_buffer_max_size( size );
00379 }
00380 }
00381
00382 ana::stats_collector& asio_server::stats_collector()
00383 {
00384 return stats_collector_;
00385 }
00386
00387 void asio_server::expecting_message( net_id id, size_t ms_until_timeout )
00388 {
00389 std::list<ana::server::client_proxy*>::iterator it;
00390
00391 it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
00392 boost::bind( &client_proxy::id, _1) == id );
00393
00394 if ( it != client_proxies_.end() )
00395 (*it)->expecting_message( ms_until_timeout );
00396 }
00397
00398 void asio_server::set_header_first_mode( ana::net_id id )
00399 {
00400 std::list<ana::server::client_proxy*>::const_iterator it;
00401
00402 it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
00403 boost::bind( &client_proxy::id, _1) == id );
00404
00405 if ( it != client_proxies_.end() )
00406 (*it)->set_header_first_mode();
00407 }
00408
00409 void asio_server::set_raw_data_mode( ana::net_id id )
00410 {
00411 std::list<ana::server::client_proxy*>::const_iterator it;
00412
00413 it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
00414 boost::bind( &client_proxy::id, _1) == id );
00415
00416 if ( it != client_proxies_.end() )
00417 (*it)->set_raw_data_mode();
00418 }
00419
00420 const ana::stats* asio_server::asio_client_proxy::get_stats( ana::stat_type type ) const
00421 {
00422 return stats_collector_.get_stats( type );
00423 }
00424
00425 void asio_server::cancel_pending( )
00426 {
00427 std::for_each( client_proxies_.begin(), client_proxies_.end(),
00428 boost::bind(&ana::server::client_proxy::cancel_pending, _1 ) );
00429 }
00430
00431 void asio_server::cancel_pending( ana::net_id client_id )
00432 {
00433 std::list<ana::server::client_proxy*>::const_iterator it;
00434
00435 it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
00436 boost::bind( &client_proxy::id, _1) == client_id );
00437
00438 if ( it != client_proxies_.end() )
00439 (*it)->cancel_pending();
00440 }
00441
00442
00443 void asio_server::asio_client_proxy::send(ana::detail::shared_buffer buffer,
00444 send_handler* handler,
00445 ana::detail::sender* sender,
00446 ana::operation_id op_id)
00447 {
00448 asio_sender::send( buffer, socket_, handler, sender, op_id );
00449 }
00450
00451 void asio_server::asio_client_proxy::disconnect_listener()
00452 {
00453 cancel_pending();
00454 delete this;
00455 }
00456
00457 void asio_server::asio_client_proxy::cancel_pending()
00458 {
00459 socket_.cancel();
00460 }
00461
00462
00463 std::string asio_server::asio_client_proxy::ip_address() const
00464 {
00465 return socket_.remote_endpoint().address().to_string();
00466 }
00467
00468 ana::stats_collector& asio_server::asio_client_proxy::stats_collector()
00469 {
00470 return stats_collector_;
00471 }
00472
00473 ana::timer* asio_server::asio_client_proxy::create_timer()
00474 {
00475 return new ana::timer( socket_.get_io_service() );
00476 }
00477
00478 void asio_server::asio_client_proxy::expecting_message( size_t ms_until_timeout )
00479 {
00480 wait_for_incoming_message( ms_until_timeout, id() );
00481 }