network_manager_ana.cpp

Go to the documentation of this file.
00001 /* $Id: network_manager_ana.cpp 52533 2012-01-07 02:35:17Z shadowmaster $ */
00002 
00003 /**
00004  * @file
00005  * @brief Implementation file for network features using ana.
00006  *
00007  * Copyright (C) 2010 - 2012 Guillermo Biset.
00008  *
00009  * Part of the Battle for Wesnoth Project http://www.wesnoth.org/
00010  *
00011  * This program is free software; you can redistribute it and/or modify
00012  * it under the terms of the GNU General Public License as published by
00013  * the Free Software Foundation; either version 2 of the License, or
00014    (at your option) any later version.
00015  * This program is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY.
00017  *
00018  * See the COPYING file for more details.
00019  */
00020 
00021 #include <iostream>
00022 
00023 #include <boost/bind.hpp>
00024 #include <boost/iostreams/filtering_stream.hpp>
00025 #include <boost/iostreams/filter/gzip.hpp>
00026 
00027 #include "serialization/parser.hpp"
00028 
00029 #include "network_manager_ana.hpp"
00030 #include "serialization/binary_or_text.hpp"
00031 
00032 #include "gettext.hpp"
00033 
00034 // Begin ana_send_handler implementation ----------------------------------------------------------
00035 
00036 ana_send_handler::ana_send_handler( size_t calls ) :
00037     mutex_(),
00038     target_calls_( calls ),
00039     error_code_()
00040 {
00041     if ( calls > 0 )
00042         mutex_.lock();
00043 }
00044 
00045 ana_send_handler::~ana_send_handler()
00046 {
00047     if ( target_calls_ > 0 )
00048         throw std::runtime_error("Handler wasn't called enough times.");
00049     mutex_.lock();
00050     mutex_.unlock();
00051 }
00052 
00053 void ana_send_handler::handle_send(ana::error_code   error_code,
00054                                    ana::net_id       /*client*/,
00055                                    ana::operation_id /*op_id*/)
00056 {
00057     error_code_ = error_code;
00058 
00059     if ( --target_calls_ == 0 )
00060         mutex_.unlock();
00061 }
00062 
00063 void ana_send_handler::wait_completion()
00064 {
00065     mutex_.lock();
00066     mutex_.unlock();
00067 }
00068 
00069 // Begin ana_handshake_finisher_handler implementation --------------------------------------------
00070 
00071 ana_handshake_finisher_handler::ana_handshake_finisher_handler( ana::server*     server,
00072                                                                 clients_manager* mgr)
00073     : server_( server ),
00074       manager_( mgr )
00075 {
00076 }
00077 
00078 ana_handshake_finisher_handler::~ana_handshake_finisher_handler()
00079 {
00080 }
00081 
00082 void ana_handshake_finisher_handler::handle_send(ana::error_code   ec,
00083                                                  ana::net_id       client,
00084                                                  ana::operation_id /*op_id*/)
00085 {
00086     if ( ec )
00087         server_->disconnect( client );
00088     else
00089         manager_->connected( client );
00090 
00091     delete this;
00092 }
00093 
00094 // Begin ana_receive_handler implementation -------------------------------------------------------
00095 
00096 ana_receive_handler::ana_receive_handler( ana_component_set::iterator iterator ) :
00097     iterator_( iterator ),
00098     mutex_(),
00099     handler_mutex_(),
00100     timeout_called_mutex_(),
00101     error_code_(),
00102     receive_timer_( NULL ),
00103     finished_( false )
00104 {
00105     mutex_.lock();
00106     timeout_called_mutex_.lock();
00107 }
00108 
00109 ana_receive_handler::~ana_receive_handler()
00110 {
00111     timeout_called_mutex_.lock();
00112     timeout_called_mutex_.unlock();
00113     handler_mutex_.lock();
00114     handler_mutex_.unlock();
00115 }
00116 
00117 void ana_receive_handler::wait_completion(ana::detail::timed_sender* component, size_t timeout_ms )
00118 {
00119     {
00120         boost::mutex::scoped_lock lock( handler_mutex_);
00121         if ( finished_ )
00122         {
00123             mutex_.unlock();
00124             timeout_called_mutex_.unlock();
00125         }
00126         else if ( timeout_ms > 0 )
00127         {
00128             receive_timer_ = component->create_timer();
00129 
00130             receive_timer_->wait( ana::time::milliseconds(timeout_ms),
00131                                 boost::bind(&ana_receive_handler::handle_timeout,
00132                                             this,
00133                                             ana::timeout_error ) );
00134         }
00135     }
00136 
00137     mutex_.lock();
00138     mutex_.unlock();
00139 }
00140 
00141 void ana_receive_handler::handle_receive(ana::error_code          error_c,
00142                                          ana::net_id              client,
00143                                          ana::read_buffer read_buffer)
00144 {
00145     boost::mutex::scoped_lock lock( handler_mutex_);
00146 
00147     delete receive_timer_;
00148     receive_timer_ = NULL;
00149 
00150     (*iterator_)->add_buffer( read_buffer, client );
00151 
00152     error_code_ = error_c;
00153 
00154     if (! finished_ )
00155     {
00156         finished_ = true;
00157         mutex_.unlock();
00158     }
00159 }
00160 
00161 void ana_receive_handler::handle_disconnect(ana::error_code error_c, ana::net_id)
00162 {
00163     boost::mutex::scoped_lock lock( handler_mutex_);
00164 
00165     delete receive_timer_;
00166     receive_timer_ = NULL;
00167 
00168     if (! finished_ )
00169     {
00170         error_code_ = error_c;
00171 
00172         finished_ = true;
00173         mutex_.unlock();
00174     }
00175 }
00176 
00177 void ana_receive_handler::handle_timeout(ana::error_code error_code)
00178 {
00179     boost::mutex::scoped_lock lock( handler_mutex_ );
00180 
00181     delete receive_timer_;
00182     receive_timer_ = NULL;
00183 
00184     if (! finished_ )
00185     {
00186         error_code_ = error_code;
00187 
00188         finished_ = true;
00189         mutex_.unlock();
00190     }
00191 
00192     timeout_called_mutex_.unlock();
00193 }
00194 
00195 // Begin ana_multiple_receive_handler implementation ----------------------------------------------
00196 
00197 ana_multiple_receive_handler::ana_multiple_receive_handler( ana_component_set& components ) :
00198     components_( components ),
00199     mutex_(),
00200     handler_mutex_(),
00201     timeout_called_mutex_(),
00202     error_code_(),
00203     buffer_(),
00204     wesnoth_id_(0),
00205     receive_timer_( NULL ),
00206     finished_( false )
00207 {
00208     throw std::runtime_error("Multiple receive handler constructed");
00209 
00210     ana_component_set::iterator it;
00211 
00212     for (it = components_.begin(); it != components_.end(); ++it )
00213     {
00214         if ( (*it)->is_server() )
00215             (*it)->server()->set_listener_handler( this );
00216         else
00217             (*it)->client()->set_listener_handler( this );
00218     }
00219 
00220     mutex_.lock();
00221     timeout_called_mutex_.lock();
00222 }
00223 
00224 ana_multiple_receive_handler::~ana_multiple_receive_handler()
00225 {
00226     timeout_called_mutex_.lock();
00227     timeout_called_mutex_.unlock();
00228     handler_mutex_.lock();
00229     handler_mutex_.unlock();
00230 }
00231 
00232 void ana_multiple_receive_handler::wait_completion(size_t timeout_ms )
00233 {
00234     ana_component_set::iterator it;
00235     {
00236         boost::mutex::scoped_lock lock( handler_mutex_);
00237 
00238         it = components_.begin();
00239 
00240         ana::detail::timed_sender* component;
00241 
00242         if ( (*it)->is_server())
00243             component = (*it)->server();
00244         else
00245             component = (*it)->client();
00246 
00247         if ( finished_ )
00248         {
00249             mutex_.unlock();
00250             timeout_called_mutex_.unlock();
00251         }
00252         else if ( timeout_ms > 0 )
00253         {
00254             receive_timer_ = component->create_timer();
00255 
00256             receive_timer_->wait( ana::time::milliseconds(timeout_ms),
00257                                 boost::bind(&ana_multiple_receive_handler::handle_timeout,
00258                                             this,
00259                                             ana::timeout_error ) );
00260         }
00261     }
00262     mutex_.lock();
00263     mutex_.unlock();
00264 }
00265 
00266 void ana_multiple_receive_handler::handle_receive(ana::error_code          error_c,
00267                                                   ana::net_id              id,
00268                                                   ana::read_buffer read_buffer)
00269 {
00270     boost::mutex::scoped_lock lock( handler_mutex_);
00271 
00272     delete receive_timer_;
00273     receive_timer_ = NULL;
00274 
00275     buffer_ = read_buffer;
00276     error_code_ = error_c;
00277 
00278     ana_component_set::iterator it;
00279     it = std::find_if( components_.begin(), components_.end(),
00280                     boost::bind(&ana_component::get_id, _1) == id );
00281 
00282     if ( it != components_.end())
00283         wesnoth_id_ = (*it)->get_wesnoth_id();
00284     else
00285         throw std::runtime_error("Wrong read.");
00286 
00287 
00288     if (! finished_ )
00289     {
00290         finished_ = true;
00291         mutex_.unlock();
00292     }
00293 }
00294 
00295 void ana_multiple_receive_handler::handle_disconnect(ana::error_code error_c, ana::net_id)
00296 {
00297     boost::mutex::scoped_lock lock( handler_mutex_);
00298 
00299     delete receive_timer_;
00300     receive_timer_ = NULL;
00301 
00302     error_code_ = error_c;
00303     if (! finished_ )
00304     {
00305         finished_ = true;
00306         mutex_.unlock();
00307     }
00308 }
00309 
00310 void ana_multiple_receive_handler::handle_timeout(ana::error_code error_code)
00311 {
00312     boost::mutex::scoped_lock lock( handler_mutex_ );
00313 
00314     delete receive_timer_;
00315     receive_timer_ = NULL;
00316 
00317     if (! finished_ )
00318     {
00319         error_code_ = error_code;
00320         finished_ = true;
00321         mutex_.unlock();
00322     }
00323 
00324     timeout_called_mutex_.unlock();
00325 }
00326 
00327 // Begin ana_connect_handler implementation -------------------------------------------------------
00328 
00329 ana_connect_handler::ana_connect_handler( ) :
00330     mutex_( ),
00331     error_code_()
00332 {
00333     mutex_.lock();
00334 }
00335 
00336 ana_connect_handler::~ana_connect_handler()
00337 {
00338     mutex_.lock();
00339     mutex_.unlock();
00340 }
00341 
00342 const ana::error_code& ana_connect_handler::error() const
00343 {
00344     return error_code_;
00345 }
00346 
00347 void ana_connect_handler::handle_connect(ana::error_code error_code, ana::net_id /*client*/)
00348 {
00349     error_code_ = error_code;
00350     mutex_.unlock();
00351 }
00352 
00353 void ana_connect_handler::wait_completion()
00354 {
00355     mutex_.lock();
00356     mutex_.unlock();
00357 }
00358 
00359 // Begin ana_component implementation -------------------------------------------------------------
00360 
00361 ana_component::ana_component( ) :
00362     base_( ana::server::create() ),
00363     is_server_( true ),
00364     id_( server()->id() ),
00365     wesnoth_id_( 0 ),
00366     mutex_(),
00367     condition_(),
00368     buffers_(),
00369     sender_ids_()
00370 {
00371 }
00372 
00373 ana_component::ana_component( const std::string& host, const std::string& port) :
00374     base_( ana::client::create(host,port) ),
00375     is_server_( false ),
00376     id_(  client()->id() ),
00377     wesnoth_id_( 0 ),       // will change to the received id after connection
00378     mutex_(),
00379     condition_(),
00380     buffers_(),
00381     sender_ids_()
00382 {
00383 }
00384 
00385 ana_component::~ana_component( )
00386 {
00387     if ( is_server() )
00388         delete server();
00389     else
00390         delete client();
00391 }
00392 
00393 network::statistics ana_component::get_send_stats() const
00394 {
00395     ana::stats_collector& stats = listener()->stats_collector();
00396 
00397     network::statistics result;
00398 
00399     result.current     = stats.current_packet_out_total();
00400     result.current_max = stats.current_packet_out_size();
00401     result.total       = stats.get_stats( ana::ACCUMULATED )->bytes_out();
00402 
00403     return result;
00404 }
00405 
00406 network::statistics ana_component::get_receive_stats() const
00407 {
00408     ana::stats_collector& stats = listener()->stats_collector();
00409 
00410     network::statistics result;
00411 
00412     result.current     = stats.current_packet_in_total();
00413     result.current_max = stats.current_packet_in_size();
00414     result.total       = stats.get_stats( ana::ACCUMULATED )->bytes_in();
00415 
00416     return result;
00417 }
00418 
00419 ana::server* ana_component::server() const
00420 {
00421     if( ! is_server_ )
00422         throw std::runtime_error("Component is not a server.");
00423 
00424     return boost::get<ana::server*>(base_);
00425 }
00426 
00427 ana::client* ana_component::client() const
00428 {
00429     if( is_server_ )
00430         throw std::runtime_error("Component is not a client.");
00431 
00432     return boost::get<ana::client*>(base_);
00433 }
00434 
00435 ana::detail::listener* ana_component::listener() const
00436 {
00437     if( is_server_ )
00438         return server();
00439     else
00440         return client();
00441 }
00442 
00443 bool ana_component::is_server() const
00444 {
00445     return is_server_;
00446 }
00447 
00448 bool ana_component::is_client() const
00449 {
00450     return ! is_server_;
00451 }
00452 
00453 ana::net_id ana_component::get_id() const
00454 {
00455     return id_;
00456 }
00457 
00458 network::connection ana_component::get_wesnoth_id() const
00459 {
00460     return wesnoth_id_;
00461 }
00462 
00463 void ana_component::set_wesnoth_id( network::connection id )
00464 {
00465     wesnoth_id_ = id;
00466 }
00467 
00468 const ana::stats* ana_component::get_stats( ana::stat_type type ) const
00469 {
00470     return listener()->get_stats( type );
00471 }
00472 
00473 void ana_component::add_buffer(ana::read_buffer buffer, ana::net_id id)
00474 {
00475     {
00476         boost::lock_guard<boost::mutex> lock(mutex_);
00477         buffers_.push( buffer );
00478 
00479         if ( is_server_ )
00480             sender_ids_.push( id );
00481     }
00482     condition_.notify_all();
00483 }
00484 
00485 ana::read_buffer ana_component::wait_for_element()
00486 {
00487     boost::unique_lock<boost::mutex> lock(mutex_);
00488 
00489     while(buffers_.empty())
00490         condition_.wait(lock);
00491 
00492     const ana::read_buffer buffer_ret = buffers_.front();
00493 
00494     buffers_.pop();
00495 
00496     return buffer_ret;
00497 }
00498 
00499 network::connection ana_component::oldest_sender_id_still_pending()
00500 {
00501     boost::unique_lock<boost::mutex> lock(mutex_);
00502 
00503     if ( sender_ids_.empty())
00504         throw std::runtime_error("No pending buffer.");
00505 
00506     const network::connection id = sender_ids_.front();
00507 
00508     sender_ids_.pop();
00509 
00510     return id;
00511 }
00512 
00513 // Begin clients_manager  implementation ----------------------------------------------------------
00514 
00515 clients_manager::clients_manager( ana::server* server) :
00516     server_( server ),
00517     ids_(),
00518     pending_ids_(),
00519     pending_handshakes_()
00520 {
00521 }
00522 
00523 size_t clients_manager::client_amount() const
00524 {
00525     return ids_.size();
00526 }
00527 
00528 void clients_manager::handle_connect(ana::error_code error, ana::net_id client)
00529 {
00530     if (! error )
00531     {
00532         ids_.insert( client );
00533         pending_handshakes_.insert( client );
00534     }
00535 }
00536 
00537 void clients_manager::handle_disconnect(ana::error_code /*error*/, ana::net_id client)
00538 {
00539     ids_.erase(client);
00540     pending_ids_.erase( network::connection( client ) );
00541 }
00542 
00543 void clients_manager::connected( ana::net_id id )
00544 {
00545     pending_ids_.insert( network::connection( id ) );
00546 }
00547 
00548 void clients_manager::remove( ana::net_id id )
00549 {
00550     ids_.erase( id );
00551     pending_ids_.erase( network::connection( id ) );
00552     pending_handshakes_.erase( id );
00553 }
00554 
00555 
00556 void clients_manager::handshaked( ana::net_id id )
00557 {
00558     pending_handshakes_.erase( id );
00559 }
00560 
00561 bool clients_manager::has_connection_pending() const
00562 {
00563     return ! pending_ids_.empty();
00564 }
00565 
00566 bool clients_manager::is_pending_handshake( ana::net_id id ) const
00567 {
00568     return pending_handshakes_.find( id ) != pending_handshakes_.end();
00569 }
00570 
00571 bool clients_manager::is_a_client( ana::net_id id ) const
00572 {
00573     return ids_.find( id ) != ids_.end();
00574 }
00575 
00576 network::connection clients_manager::get_pending_connection_id()
00577 {
00578     const network::connection result = *pending_ids_.begin();
00579     pending_ids_.erase( pending_ids_.begin() );
00580     return result;
00581 }
00582 
00583 // Begin ana_network_manager implementation -------------------------------------------------------
00584 
00585 ana_network_manager::ana_network_manager() :
00586     connect_timer_( NULL ),
00587     components_(),
00588     server_manager_(),
00589     disconnected_components_(),
00590     disconnected_ids_(),
00591     proxy_settings_()
00592 {
00593 }
00594 
00595 ana::net_id ana_network_manager::create_server( )
00596 {
00597     ana_component* new_component = new ana_component( );
00598     components_.insert( new_component );
00599 
00600     ana::server* server = new_component->server();
00601 
00602     clients_manager* manager = new clients_manager( server );
00603     server_manager_[ server ] = manager;
00604 
00605     server->set_connection_handler( manager );
00606     server->set_listener_handler( this );
00607     server->set_raw_data_mode();
00608 
00609     return server->id();
00610 }
00611 
00612 network::connection ana_network_manager::create_client_and_connect(std::string host, int port)
00613 {
00614     ana::net_id new_client_id = ana::invalid_net_id;
00615 
00616     try
00617     {
00618         std::stringstream ss;
00619         ss << port;
00620 
00621         ana_component* new_component = new ana_component( host, ss.str() );
00622         components_.insert( new_component );
00623 
00624         ana::client* const client = new_component->client();
00625 
00626         new_client_id = client->id();
00627 
00628         ana_connect_handler handler;
00629 
00630         client->set_raw_data_mode();
00631         client->set_connect_timeout( ana::time::seconds(10) );
00632 
00633         if ( proxy_settings_.enabled )
00634             client->connect_through_proxy( proxy_settings_.address,
00635                                            proxy_settings_.port,
00636                                            &handler,
00637                                            proxy_settings_.user,
00638                                            proxy_settings_.password);
00639         else
00640             client->connect( &handler );
00641 
00642         client->set_listener_handler( this );
00643         client->run();
00644 
00645         handler.wait_completion(); // just wait for handler to finish
00646 
00647         if( handler.error() )
00648         {
00649             network::disconnect( client->id() );
00650             throw network::error(_("Could not connect to host"), client->id() );
00651         }
00652         else
00653         {
00654             //Send handshake
00655             ana::serializer::bostream bos;
00656 
00657             ana::ana_uint32 handshake( 0 );
00658             bos << handshake;
00659 
00660             ana_send_handler send_handler;
00661 
00662             client->send( ana::buffer( bos.str()), &send_handler); //, ana::ZERO_COPY );
00663 
00664             send_handler.wait_completion();
00665 
00666             if ( send_handler.error() )
00667                 throw network::error(_("Could not connect to host"), client->id() );
00668             else
00669             {
00670                 ana::ana_uint32 my_id;
00671                 ana::serializer::bistream bis;
00672 
00673                 client->wait_raw_object(bis, sizeof(ana::ana_uint32) );
00674 
00675                 bis >> my_id;
00676                 ana::network_to_host_long( my_id );
00677 
00678                 new_component->set_wesnoth_id( my_id );
00679 
00680                 client->set_header_first_mode();
00681                 client->run_listener();
00682 
00683                 return network::connection( client->id() );
00684             }
00685         }
00686     }
00687     catch( const std::exception& )
00688     {
00689         throw network::error(_("Could not connect to host"), new_client_id );
00690         return 0;
00691     }
00692 }
00693 
00694 network::connection ana_network_manager::new_connection_id( )
00695 {
00696     ana_component_set::iterator it;
00697 
00698     for (it = components_.begin(); it != components_.end(); ++it)
00699     {
00700         if ( (*it)->is_server() )
00701         {
00702             clients_manager* clients_mgr = server_manager_[ (*it)->server() ];
00703             if ( clients_mgr->has_connection_pending() )
00704                 return clients_mgr->get_pending_connection_id();
00705         }
00706     }
00707 
00708     // No new connection
00709     return 0;
00710 }
00711 
00712 
00713 const ana::stats* ana_network_manager::get_stats( network::connection connection_num,
00714                                                   ana::stat_type      type)
00715 {
00716     ana::net_id id( connection_num );
00717     std::set<ana_component*>::iterator it;
00718 
00719     if ( id == 0 )
00720     {
00721         if ( ! components_.empty() )
00722         {
00723             it = components_.begin();
00724             return (*it)->get_stats( type );
00725         }
00726         else
00727             return NULL;
00728     }
00729     else
00730     {
00731         it = std::find_if( components_.begin(), components_.end(),
00732                            boost::bind(std::logical_or<bool>(),
00733                            (boost::bind(&ana_component::get_wesnoth_id, _1) == connection_num),
00734                            (boost::bind(&ana_component::get_id, _1) == id ) ));
00735         //Make a broad attempt at finding it, test for both ANA's id and the assigned one.
00736 
00737         if ( it != components_.end())
00738             return (*it)->get_stats( type );
00739         else
00740         {
00741             for ( it = components_.begin() ; it != components_.end(); ++it)
00742             {
00743                 if ( (*it)->is_server() )
00744                 {
00745                     const ana::stats* res = (*it)->server()->get_client_stats(id,ana::ACCUMULATED);
00746                     if ( res != NULL )
00747                         return res;
00748                 }
00749             }
00750         }
00751 
00752         return NULL;
00753     }
00754 }
00755 
00756 void ana_network_manager::close_connections_and_cleanup()
00757 {
00758     for (ana_component_set::iterator it = components_.begin(); it != components_.end(); ++it)
00759         delete *it;
00760 
00761     std::map< ana::server*, clients_manager* >::iterator it;
00762     for ( it = server_manager_.begin(); it != server_manager_.end(); ++it)
00763         delete it->second;
00764 
00765     components_.clear();
00766     server_manager_.clear();
00767 }
00768 
00769 void ana_network_manager::throw_if_pending_disconnection()
00770 {
00771     if ( ! disconnected_components_.empty() )
00772     {
00773         ana_component* component = disconnected_components_.front();
00774         disconnected_components_.pop();
00775 
00776         const ana::net_id id = component->get_id(); // Should I use wesnoth_id here?
00777 
00778         delete component;
00779 
00780         throw network::error(_("Client disconnected"),id);
00781     }
00782 
00783     if ( ! disconnected_ids_.empty() )
00784     {
00785         ana::net_id id = disconnected_ids_.front();
00786         disconnected_ids_.pop();
00787         throw network::error(_("Client disconnected"),id);
00788     }
00789 }
00790 
00791 void ana_network_manager::run_server(ana::net_id id, int port)
00792 {
00793     std::stringstream ss;
00794     ss << port;
00795 
00796     ana_component_set::iterator it;
00797 
00798     it = std::find_if( components_.begin(), components_.end(),
00799                         boost::bind(&ana_component::get_id, _1) == id );
00800 
00801     if ( it == components_.end())
00802         throw std::runtime_error("No server with this id.");
00803     else
00804         if ( (*it)->is_server() )
00805             (*it)->server()->run( ss.str() );
00806         else
00807             throw std::runtime_error("Component is not a server.");
00808 
00809 }
00810 
00811 std::string ana_network_manager::ip_address( network::connection id )
00812 {
00813     std::set<ana_component*>::iterator it;
00814 
00815     for (it = components_.begin(); it != components_.end(); ++it)
00816     {
00817         if ( (*it)->is_server() )
00818         {
00819             const std::string ip = (*it)->server()->ip_address( ana::net_id( id ) );
00820             if  (ip != "")
00821                 return ip;
00822         }
00823     }
00824     return "";
00825 }
00826 
00827 size_t ana_network_manager::number_of_connections() const
00828 {
00829     // TODO:check if this is the intention
00830 
00831     size_t total(0);
00832 
00833     ana_component_set::const_iterator it;
00834 
00835     for (it = components_.begin(); it != components_.end(); ++it )
00836     {
00837         if ((*it)->is_client())
00838             ++total;
00839         else
00840         {
00841             std::map< ana::server*, clients_manager* >::const_iterator mgr = server_manager_.find(
00842 (*it)->server() );
00843             total += mgr->second->client_amount();
00844         }
00845     }
00846 
00847     return total;
00848 }
00849 
00850 
00851 std::string ana_network_manager::compress_config( const config& cfg )
00852 {
00853     std::ostringstream out;
00854     compress_config( cfg, out );
00855     return out.str( );
00856 }
00857 
00858 
00859 void ana_network_manager::compress_config( const config& cfg, std::ostringstream& out)
00860 {
00861     boost::iostreams::filtering_stream<boost::iostreams::output> filter;
00862     filter.push(boost::iostreams::gzip_compressor());
00863     filter.push(out);
00864     write(filter, cfg);
00865     out.flush();
00866 }
00867 
00868 
00869 void ana_network_manager::read_config( const ana::read_buffer& buffer, config& cfg)
00870 {
00871     std::istringstream input( buffer->string() );
00872 
00873     read_gz(cfg, input);
00874 }
00875 
00876 size_t ana_network_manager::send_all( const config& cfg )
00877 {
00878     const std::string output_string = compress_config(cfg);
00879 
00880     std::set<ana_component*>::iterator it;
00881 
00882     for (it = components_.begin(); it != components_.end(); ++it)
00883     {
00884         if ( (*it)->is_server() )
00885         {
00886             const size_t necessary_calls = server_manager_[ (*it)->server() ]->client_amount();
00887             ana_send_handler handler( necessary_calls );
00888 
00889             (*it)->server()->send_all( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY);
00890             handler.wait_completion(); // the handler will release the mutex after
00891                                        // necessary_calls calls
00892         }
00893         else
00894         {
00895             ana_send_handler handler;
00896 
00897             (*it)->client()->send( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY );
00898             handler.wait_completion();
00899         }
00900     }
00901     return output_string.size();
00902 }
00903 
00904 size_t ana_network_manager::send( network::connection connection_num ,
00905                                   const config&       cfg )
00906 {
00907     const std::string output_string = compress_config(cfg);
00908 
00909 
00910     return send_raw_data( output_string.c_str(), output_string.size(), connection_num );
00911 }
00912 
00913 size_t ana_network_manager::send_raw_data( const char*         base_char,
00914                                            size_t              size,
00915                                            network::connection connection_num )
00916 {
00917     ana::net_id id( connection_num );
00918     ana_component_set::iterator it;
00919 
00920     it = std::find_if( components_.begin(), components_.end(),
00921                    boost::bind(std::logical_or<bool>(),
00922                        (boost::bind(&ana_component::get_wesnoth_id, _1) == connection_num),
00923                        (boost::bind(&ana_component::get_id, _1) == id ) ));
00924     //Make a broad attempt at finding it, test for both ANA's id and the assigned one.
00925 
00926     if ( it != components_.end())
00927     {
00928         if ( (*it)->is_server() )
00929             throw std::runtime_error("Can't send to the server itself.");
00930 
00931         ana_send_handler handler;
00932         (*it)->client()->send( ana::buffer( base_char, size ), &handler); //, ana::ZERO_COPY);
00933         handler.wait_completion();
00934 
00935         if ( handler.error() )
00936             return 0;
00937         else
00938             return size;
00939     }
00940     else
00941     {
00942         if ( components_.empty() )
00943             return 0;
00944         else
00945         {
00946             it = components_.begin();
00947 
00948             if ((*it)->is_server())
00949             {
00950                 ana_send_handler handler;                   //, ana::ZERO_COPY);
00951                 (*it)->server()->send_one( id, ana::buffer( base_char, size ), &handler);
00952                 handler.wait_completion();
00953                 if ( handler.error() )
00954                     return 0;
00955                 else
00956                     return size;
00957             }
00958             else
00959             {
00960                 ana_send_handler handler;
00961                 (*it)->client()->send( ana::buffer( base_char, size ), &handler);
00962                 handler.wait_completion();
00963 
00964                 if ( handler.error() )
00965                     return 0;
00966                 else
00967                     return size;
00968             }
00969         }
00970     }
00971 }
00972 
00973 void ana_network_manager::send_all_except(const config& cfg, network::connection connection_num)
00974 {
00975     const std::string output_string = compress_config(cfg);
00976 
00977 
00978     ana_component_set::iterator it;
00979 
00980     ana::net_id id_to_avoid( connection_num ); //I should have issued this id earlier
00981 
00982     for ( it = components_.begin(); it != components_.end(); ++it)
00983     {
00984         if ((*it)->is_server())
00985         {
00986             if ( (*it)->get_id() != id_to_avoid )
00987             {
00988                 if ( server_manager_[ (*it)->server() ]->is_a_client( id_to_avoid ) )
00989                 {
00990                     const size_t clients_receiving_number
00991                                 = server_manager_[ (*it)->server() ]->client_amount() - 1;
00992 
00993                     ana_send_handler handler( clients_receiving_number );
00994                     (*it)->server()->send_all_except( id_to_avoid,
00995                                                       ana::buffer( output_string ),
00996                                                       &handler); //, ana::ZERO_COPY);
00997                     handler.wait_completion();
00998                 }
00999             }
01000         }
01001         else
01002         {
01003             if ( (*it)->get_wesnoth_id() != connection_num )
01004             {
01005                 ana_send_handler handler;
01006                 (*it)->client()->send( ana::buffer( output_string ), &handler); //, ana::ZERO_COPY);
01007                 handler.wait_completion();
01008             }
01009         }
01010     }
01011 }
01012 
01013 network::connection ana_network_manager::read_from_ready_buffer(
01014                                          const ana_component_set::iterator& it, config& cfg)
01015 {
01016     read_config( (*it)->wait_for_element(), cfg);
01017 
01018     return (*it)->get_wesnoth_id();
01019 }
01020 
01021 network::connection ana_network_manager::read_from( const ana_component_set::iterator& it,
01022                                                     config&             cfg,
01023                                                     size_t              timeout_ms)
01024 {
01025     if (  (*it)->new_buffer_ready() )
01026         return read_from_ready_buffer( it, cfg );
01027     else if (timeout_ms == 0 )
01028         return 0;
01029     else
01030     {
01031         ana_receive_handler handler(it);
01032         (*it)->listener()->set_listener_handler( &handler );
01033 
01034         if (  (*it)->new_buffer_ready() )
01035             return read_from_ready_buffer( it, cfg );
01036         else
01037         {
01038             if ( (*it)->is_server() )
01039                 handler.wait_completion( (*it)->server(), timeout_ms );
01040             else
01041                 if ( (*it)->get_wesnoth_id() != 0 )
01042                     handler.wait_completion( (*it)->client(), timeout_ms );
01043                 else
01044                     return 0;
01045                 //Don't try to read from a still unconnected client
01046         }
01047 
01048         (*it)->listener()->set_listener_handler( this );
01049 
01050         if ( handler.error() )
01051             return 0;
01052         else
01053             return read_from_ready_buffer( it, cfg );
01054     }
01055 }
01056 
01057 network::connection ana_network_manager::read_from( network::connection connection_num,
01058                                                     config&             cfg,
01059                                                     size_t              timeout_ms)
01060 {
01061     if ( components_.empty() )
01062         return 0;
01063 
01064     ana_component_set::iterator it;
01065 
01066     if ( connection_num == 0 )
01067     {
01068         if ( components_.size() == 1 )
01069             return read_from( components_.begin(), cfg, timeout_ms );
01070         else
01071         {
01072             //Check first if there is an available buffer
01073             for (it = components_.begin(); it != components_.end(); ++it)
01074                 if (  (*it)->new_buffer_ready() )
01075                     return read_from_ready_buffer( it, cfg );
01076 
01077             // If no timeout was requested, return
01078             if (timeout_ms == 0 )
01079                 return 0;
01080 
01081             // Wait timeout_ms milliseconds to see if any component will receive something
01082             ana_multiple_receive_handler handler( components_ );
01083 
01084             for (it = components_.begin(); it != components_.end(); ++it )
01085                 (*it)->listener()->set_listener_handler( &handler );
01086 
01087             handler.wait_completion( timeout_ms );
01088 
01089             for (it = components_.begin(); it != components_.end(); ++it )
01090                 (*it)->listener()->set_listener_handler( this );
01091 
01092             if ( handler.error() )
01093             {
01094                 for (it = components_.begin(); it != components_.end(); ++it)
01095                     if (  (*it)->new_buffer_ready() )
01096                         return read_from_ready_buffer( it, cfg );
01097 
01098                 // So nothing was read:
01099                 return 0;
01100             }
01101             else
01102             {
01103                 read_config( handler.buffer(), cfg);
01104                 return handler.get_wesnoth_id();
01105             }
01106         }
01107     }
01108     else
01109     {
01110         ana::net_id id( connection_num );
01111 
01112         it = std::find_if( components_.begin(), components_.end(),
01113                     boost::bind(std::logical_or<bool>(),
01114                         (boost::bind(&ana_component::get_wesnoth_id, _1) == connection_num),
01115                         (boost::bind(&ana_component::get_id, _1) == id ) ) );
01116         //Make a broad attempt at finding it, test for both ANA's id and the assigned one.
01117 
01118 
01119         if ( it != components_.end())
01120             return read_from(it, cfg, timeout_ms);
01121         else
01122             throw std::runtime_error("Trying a network read from an invalid component id.");
01123     }
01124 }
01125 
01126 network::connection ana_network_manager::read_from_all( std::vector<char>& vec)
01127 {
01128     ana_component_set::iterator it;
01129 
01130     if ( components_.empty() )
01131         return 0;
01132 
01133     for (it = components_.begin(); it != components_.end(); ++it)
01134     {
01135         if (  (*it)->new_buffer_ready() )
01136         {
01137             ana::read_buffer buffer = (*it)->wait_for_element();
01138 
01139             char* ch = buffer->base_char();
01140             for (size_t i = 0; i < buffer->size(); ++i)  // copy the buffer
01141                 vec.push_back( *(ch++) );
01142 
01143             if ( (*it)->is_client() )
01144                 return (*it)->get_wesnoth_id();
01145             else
01146                 return (*it)->oldest_sender_id_still_pending();
01147         }
01148     }
01149 
01150     // there wasn't any buffer ready
01151     return 0;
01152 }
01153 
01154 network::statistics ana_network_manager::get_send_stats(network::connection handle)
01155 {
01156     if ( handle != 0 )
01157     {
01158 //         ana::net_id id( handle );
01159         std::set< ana_component* >::iterator it;
01160 
01161         it = std::find_if( components_.begin(), components_.end(),
01162                             boost::bind(&ana_component::get_wesnoth_id, _1) == handle );
01163 
01164         if ( it != components_.end() )
01165             return (*it)->get_send_stats( );
01166         else
01167             throw std::runtime_error("Trying to get stats from the wrong component.");
01168     }
01169     else if( ! components_.empty() )
01170     {
01171         std::set< ana_component* >::iterator it = components_.begin();
01172         return (*it)->get_send_stats();
01173     }
01174     else
01175         return network::statistics();
01176 }
01177 
01178 network::statistics ana_network_manager::get_receive_stats(network::connection handle)
01179 {
01180     if ( handle != 0 )
01181     {
01182         ana::net_id id( handle );
01183         std::set< ana_component* >::iterator it;
01184 
01185         it = std::find_if( components_.begin(), components_.end(),
01186                             boost::bind(&ana_component::get_id, _1) == id );
01187 
01188         if ( it != components_.end() )
01189             return (*it)->get_receive_stats( );
01190         else
01191             throw std::runtime_error("Received message from a non connected component.");
01192     }
01193     else if( ! components_.empty() )
01194     {
01195         std::set< ana_component* >::iterator it = components_.begin();
01196         return (*it)->get_receive_stats();
01197     }
01198     else
01199         return network::statistics();
01200 }
01201 
01202 bool ana_network_manager::disconnect( network::connection handle)
01203 {
01204     if ( handle == 0 )
01205         close_connections_and_cleanup();
01206     else
01207     {
01208         ana::net_id id( handle );
01209         ana_component_set::iterator it;
01210 
01211         it = std::find_if( components_.begin(), components_.end(),
01212                        boost::bind(std::logical_or<bool>(),
01213                            (boost::bind(&ana_component::get_wesnoth_id, _1) == handle),
01214                            (boost::bind(&ana_component::get_id, _1) == id ) ));
01215         //Make a broad attempt at finding it, test for both ANA's id and the assigned one.
01216 
01217         if ( it == components_.end())
01218             throw std::runtime_error("Trying to disconnect an invalid component.");
01219         else
01220         {
01221             if ( (*it)->is_server() )
01222                 throw std::runtime_error("Can't disconnect server directly.");
01223             else
01224                 (*it)->client()->disconnect();
01225         }
01226     }
01227     return true;
01228 }
01229 
01230 // --- Proxy methods
01231 void ana_network_manager::enable_connection_through_proxy()
01232 {
01233     proxy_settings_.enabled = true;
01234 }
01235 
01236 void ana_network_manager::set_proxy_address ( const std::string& address  )
01237 {
01238     proxy_settings_.address = address;
01239 }
01240 
01241 void ana_network_manager::set_proxy_port    ( const std::string& port     )
01242 {
01243     proxy_settings_.port = port;
01244 }
01245 
01246 void ana_network_manager::set_proxy_user    ( const std::string& user     )
01247 {
01248     proxy_settings_.user = user;
01249 }
01250 
01251 void ana_network_manager::set_proxy_password( const std::string& password )
01252 {
01253     proxy_settings_.password = password;
01254 }
01255 // --- End Proxy methods
01256 
01257 
01258 void ana_network_manager::handle_send(ana::error_code error_code,
01259                                       ana::net_id client,
01260                                       ana::operation_id /*op_id*/)
01261 {
01262     if ( error_code )
01263         network::disconnect( client );
01264 }
01265 
01266 void ana_network_manager::handle_receive( ana::error_code          error,
01267                                           ana::net_id              client,
01268                                           ana::read_buffer buffer)
01269 {
01270     if (error)
01271         network::disconnect( client );
01272     else
01273     {
01274         std::set< ana_component* >::iterator it;
01275 
01276         it = std::find_if( components_.begin(), components_.end(),
01277                             boost::bind(&ana_component::get_id, _1) == client );
01278 
01279         if ( it != components_.end() )
01280             (*it)->add_buffer( buffer, client );
01281         else
01282         {
01283             if (components_.empty() )
01284                 throw std::runtime_error("Received a message while no component was running.\n");
01285 
01286             std::map< ana::server*, clients_manager* >::iterator mgrs;
01287 
01288             for ( mgrs = server_manager_.begin(); mgrs != server_manager_.end(); ++mgrs)
01289             {
01290                 if (mgrs->second->is_a_client( client ) ) // Is this your client?
01291                 {
01292                     if ( mgrs->second->is_pending_handshake( client ) ) // Did he login already?
01293                     {
01294                         // all handshakes are 4 bytes long
01295                         if ( buffer->size() != sizeof(ana::ana_uint32) )
01296                             mgrs->first->disconnect( client );
01297 
01298                         ana::ana_uint32 handshake;
01299                         {
01300                             ana::serializer::bistream bis( buffer->string() );
01301 
01302                             bis >> handshake;
01303                             ana::network_to_host_long( handshake ); //I'm expecting a 0 anyway
01304                         }
01305 
01306                         if ( handshake != 0 )
01307                             mgrs->first->disconnect( client );
01308                         else
01309                         {
01310                             mgrs->second->handshaked( client );
01311                             //send back it's id
01312                             ana::serializer::bostream bos;
01313                             ana::ana_uint32 network_byte_order_id = client;
01314                             ana::host_to_network_long( network_byte_order_id );
01315                             bos << network_byte_order_id;
01316 
01317                             ana_handshake_finisher_handler* handler
01318                                 = new ana_handshake_finisher_handler( mgrs->first,
01319                                                                       mgrs->second);
01320 
01321                             mgrs->first->send_one(client, ana::buffer( bos.str() ), handler );
01322                             mgrs->first->set_header_first_mode( client );
01323                         }
01324                     }
01325                     else // just add the buffer to the associated clients_manager
01326                     {
01327                         ana::net_id server_id = mgrs->first->id();
01328 
01329                         it = std::find_if( components_.begin(), components_.end(),
01330                                         boost::bind(&ana_component::get_id, _1) == server_id );
01331 
01332                         if ( (*it)->is_client() )
01333                             throw std::runtime_error("Wrong id to receive from.");
01334 
01335                         (*it)->add_buffer( buffer, client );
01336                     }
01337                 }
01338             }
01339         }
01340     }
01341 }
01342 
01343 bool ana_component::new_buffer_ready() // non const due to mutex block
01344 {
01345     boost::mutex::scoped_lock lock( mutex_ );
01346 
01347     return ! buffers_.empty();
01348 }
01349 
01350 
01351 void ana_network_manager::handle_disconnect(ana::error_code /*error_code*/, ana::net_id client)
01352 {
01353     std::set< ana_component* >::iterator it;
01354 
01355     it = std::find_if( components_.begin(), components_.end(),
01356                         boost::bind(&ana_component::get_id, _1) == client );
01357 
01358     if ( it != components_.end() )
01359     {
01360         disconnected_components_.push( *it );
01361         components_.erase(it);
01362     }
01363     else
01364     {
01365         for (it = components_.begin(); it != components_.end(); ++it )
01366             if ( (*it)->is_server() )
01367                 if ( server_manager_[ (*it)->server() ]->is_a_client( client ) )
01368                 {
01369                     server_manager_[ (*it)->server() ]->remove( client );
01370                     disconnected_ids_.push( client );
01371                 }
01372     }
01373 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated by doxygen 1.7.1 on Fri May 25 2012 01:03:07 for The Battle for Wesnoth
Gna! | Forum | Wiki | CIA | devdocs