00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <boost/bind.hpp>
00034 #include <boost/thread.hpp>
00035
00036 #include "asio_listener.hpp"
00037
00038 using boost::asio::ip::tcp;
00039
00040 asio_listener::asio_listener( ) :
00041 disconnected_( false ),
00042 listener_( NULL ),
00043 header_(),
00044 raw_mode_buffer_size_( ana::INITIAL_RAW_MODE_BUFFER_SIZE ),
00045 next_message_timer_( NULL )
00046 {
00047 }
00048
00049 asio_listener::~asio_listener()
00050 {
00051 delete next_message_timer_;
00052 }
00053
00054 void asio_listener::wait_for_incoming_message( size_t ms_to_timeout, ana::net_id id )
00055 {
00056 if ( (next_message_timer_ == NULL) && ( ms_to_timeout > 0 ) )
00057 {
00058 next_message_timer_ = new ana::timer( socket().get_io_service() );
00059
00060 next_message_timer_->wait( ms_to_timeout,
00061 boost::bind(&asio_listener::handle_timeout, this,
00062 boost::asio::placeholders::error, id) );
00063 }
00064 }
00065
00066 void asio_listener::disconnect( boost::system::error_code error)
00067 {
00068 if ( ! disconnected_ )
00069 {
00070 listener_->handle_disconnect( error, id() );
00071 disconnected_ = true;
00072 disconnect_listener();
00073 }
00074 }
00075
00076 void asio_listener::set_raw_buffer_max_size( size_t size )
00077 {
00078 if ( size == 0 )
00079 throw std::runtime_error("Can't set raw buffer size to 0.");
00080
00081 raw_mode_buffer_size_ = size;
00082 }
00083
00084 void asio_listener::handle_body( ana::read_buffer buf, const boost::system::error_code& ec)
00085 {
00086 try
00087 {
00088 if (ec)
00089 disconnect( ec );
00090 else
00091 {
00092 stats_collector().log_receive( buf );
00093 listener_->handle_receive( ec, id(), buf );
00094 listen_one_message();
00095 }
00096 }
00097 catch(const std::exception&)
00098 {
00099 disconnect( ec);
00100 }
00101 }
00102
00103 void asio_listener::handle_header(char* header, const boost::system::error_code& ec)
00104 {
00105 try
00106 {
00107 if (ec)
00108 disconnect( ec);
00109 else
00110 {
00111 stats_collector().log_receive( ana::HEADER_LENGTH );
00112 ana::serializer::bistream input( std::string(header, ana::HEADER_LENGTH) );
00113
00114 ana::ana_uint32 size;
00115 input >> size;
00116 ana::network_to_host_long( size );
00117
00118 stats_collector().start_receive_packet( size );
00119
00120 if (size != 0)
00121 {
00122 ana::read_buffer read_buf(
00123 new ana::detail::read_buffer_implementation( size ) );
00124
00125 socket().async_read_some(boost::asio::buffer(read_buf->base(), read_buf->size() ),
00126 boost::bind(&asio_listener::handle_partial_body,
00127 this, read_buf,
00128 boost::asio::placeholders::error, 0, _2 ));
00129 }
00130 else
00131 {
00132 ana::read_buffer read_buf ( new ana::detail::read_buffer_implementation(
00133 ana::HEADER_LENGTH ) );
00134
00135 for (size_t i(0); i< ana::HEADER_LENGTH; ++i)
00136 static_cast<char*>(read_buf->base())[i] = header[i];
00137
00138 listener_->handle_receive( ec, id(), read_buf );
00139 }
00140 }
00141 }
00142 catch(const std::exception&)
00143 {
00144 disconnect(ec);
00145 }
00146 }
00147
00148 void asio_listener::handle_partial_body( ana::read_buffer buffer,
00149 const boost::system::error_code& ec,
00150 size_t accumulated,
00151 size_t last_msg_size)
00152 {
00153 try
00154 {
00155 if (ec)
00156 disconnect( ec );
00157 else
00158 {
00159 accumulated += last_msg_size;
00160
00161
00162 stats_collector().log_receive( last_msg_size, accumulated == buffer->size() );
00163
00164
00165 if ( accumulated > buffer->size() )
00166 throw std::runtime_error("The read operation was too large.");
00167
00168 if ( accumulated == buffer->size() )
00169 {
00170 listener_->handle_receive( ec, id(), buffer );
00171 delete next_message_timer_;
00172 next_message_timer_ = NULL;
00173 listen_one_message();
00174 }
00175 else
00176 socket().async_read_some(boost::asio::buffer(buffer->base_char() + accumulated,
00177 buffer->size() - accumulated),
00178 boost::bind(&asio_listener::handle_partial_body,
00179 this, buffer,
00180 boost::asio::placeholders::error,
00181 accumulated, _2 ));
00182 }
00183 }
00184 catch(const std::exception&)
00185 {
00186 disconnect( ec);
00187 }
00188 }
00189
00190 void asio_listener::handle_timeout( const boost::system::error_code& error_code, ana::net_id id)
00191 {
00192 delete next_message_timer_;
00193 next_message_timer_ = NULL;
00194
00195 if ( error_code != ana::operation_aborted )
00196 listener_->handle_receive( ana::timeout_error, id, ana::read_buffer() );
00197 }
00198
00199 void asio_listener::handle_raw_buffer( ana::read_buffer buf,
00200 const boost::system::error_code& ec,
00201 size_t read_size)
00202 {
00203 try
00204 {
00205 delete next_message_timer_;
00206 next_message_timer_ = NULL;
00207
00208 if (ec)
00209 disconnect( ec );
00210 else
00211 {
00212 buf->resize( read_size );
00213 stats_collector().log_receive( buf );
00214 listener_->handle_receive( ec, id(), buf );
00215 listen_one_message();
00216 }
00217 }
00218 catch(const std::exception&)
00219 {
00220 disconnect( ec);
00221 }
00222 }
00223
00224 void asio_listener::wait_raw_object(ana::serializer::bistream& bis, size_t size)
00225 {
00226 tcp::socket& sock = socket();
00227
00228 std::vector<char> buf(size);
00229
00230 size_t received;
00231
00232 received = sock.receive( boost::asio::buffer( &buf[0], size ) );
00233
00234 if ( received != size )
00235 throw std::runtime_error("Read a different amount of bytes than what was expected.");
00236
00237 bis.str( std::string( &buf[0], size ) );
00238 }
00239
00240
00241 void asio_listener::set_listener_handler( ana::listener_handler* listener )
00242 {
00243 listener_ = listener;
00244 }
00245
00246 void asio_listener::run_listener( )
00247 {
00248 listen_one_message();
00249 }
00250
00251 void asio_listener::listen_one_message()
00252 {
00253 try
00254 {
00255 if ( header_mode() )
00256 boost::asio::async_read(socket(), boost::asio::buffer(header_, ana::HEADER_LENGTH),
00257 boost::bind(&asio_listener::handle_header, this,
00258 header_, boost::asio::placeholders::error));
00259 else
00260 {
00261 ana::read_buffer raw_buffer(
00262 new ana::detail::read_buffer_implementation( raw_mode_buffer_size_ ) );
00263
00264 socket().async_read_some(boost::asio::buffer(raw_buffer->base(),
00265 raw_mode_buffer_size_ ),
00266 boost::bind(&asio_listener::handle_raw_buffer, this,
00267 raw_buffer, boost::asio::placeholders::error, _2));
00268 }
00269 }
00270 catch(const std::exception&)
00271 {
00272 disconnect( ana::generic_error );
00273 }
00274 }
00275