00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <boost/bind.hpp>
00034
00035 #include "asio_sender.hpp"
00036
00037 void asio_sender::send(ana::detail::shared_buffer buffer ,
00038 tcp::socket& socket ,
00039 ana::send_handler* handler,
00040 ana::detail::sender* sender ,
00041 ana::operation_id op_id )
00042 {
00043 ana::timer* running_timer( NULL );
00044 try
00045 {
00046 if ( sender->timeouts_enabled() )
00047 {
00048 running_timer = sender->create_timer();
00049
00050 sender->start_timer( running_timer, buffer,
00051 boost::bind(&asio_sender::handle_send, this,
00052 boost::asio::placeholders::error, handler,
00053 running_timer, op_id, true ) );
00054 }
00055
00056 stats_collector().start_send_packet( buffer->size()
00057 + ( raw_mode() ? 0 : ana::HEADER_LENGTH ) );
00058
00059 if ( raw_mode() )
00060 {
00061 socket.async_write_some( boost::asio::buffer(buffer->base(), buffer->size() ),
00062 boost::bind(&asio_sender::handle_partial_send,this,
00063 buffer, boost::asio::placeholders::error,
00064 &socket, handler, running_timer, 0, _2, op_id ));
00065 }
00066 else
00067 {
00068 ana::ana_uint32 size( buffer->size() );
00069 ana::host_to_network_long( size );
00070
00071 ana::serializer::bostream* output_stream = new ana::serializer::bostream();
00072 (*output_stream) << size;
00073
00074
00075 socket.async_write_some( boost::asio::buffer( output_stream->str() ),
00076 boost::bind(&asio_sender::handle_sent_header,this,
00077 boost::asio::placeholders::error, output_stream,
00078 &socket, buffer,
00079 handler, running_timer, _2, op_id ));
00080 }
00081 }
00082 catch(const std::exception&)
00083 {
00084 disconnect();
00085 delete running_timer;
00086 }
00087 }
00088
00089
00090 void asio_sender::handle_sent_header(const ana::error_code& ec,
00091 ana::serializer::bostream* bos,
00092 tcp::socket* socket,
00093 ana::detail::shared_buffer buffer,
00094 ana::send_handler* handler,
00095 ana::timer* running_timer,
00096 size_t bytes_sent,
00097 ana::operation_id op_id)
00098 {
00099 delete bos;
00100
00101 if (bytes_sent != sizeof( ana::ana_uint32 ) )
00102 throw std::runtime_error("Couldn't send header.");
00103
00104 if ( ec )
00105 handle_send(ec, handler, running_timer, op_id);
00106 else
00107 {
00108 socket->async_write_some( boost::asio::buffer(buffer->base(), buffer->size() ),
00109 boost::bind(&asio_sender::handle_partial_send,this,
00110 buffer, boost::asio::placeholders::error,
00111 socket, handler, running_timer, 0, _2, op_id ));
00112 }
00113 }
00114
00115 void asio_sender::handle_partial_send( ana::detail::shared_buffer buffer,
00116 const ana::error_code& ec,
00117 tcp::socket* socket,
00118 ana::send_handler* handler,
00119 ana::timer* timer,
00120 size_t accumulated,
00121 size_t last_msg_size,
00122 ana::operation_id op_id)
00123 {
00124 try
00125 {
00126 if (ec)
00127 handle_send(ec, handler, timer, op_id);
00128 else
00129 {
00130 accumulated += last_msg_size;
00131
00132 stats_collector().log_send( last_msg_size, accumulated == buffer->size() );
00133
00134 if ( accumulated > buffer->size() )
00135 throw std::runtime_error("The send operation was too large.");
00136
00137 if ( accumulated == buffer->size() )
00138 handle_send( ec, handler, timer, op_id );
00139 else
00140 socket->async_write_some(boost::asio::buffer(buffer->base_char() + accumulated,
00141 buffer->size() - accumulated),
00142 boost::bind(&asio_sender::handle_partial_send, this,
00143 buffer, boost::asio::placeholders::error,
00144 socket, handler, timer,
00145 accumulated, _2, op_id ));
00146 }
00147 }
00148 catch(const std::exception&)
00149 {
00150 disconnect( );
00151 }
00152 }
00153
00154 void asio_sender::handle_send(const ana::error_code& ec,
00155 ana::send_handler* handler,
00156 ana::timer* running_timer,
00157 ana::operation_id op_id,
00158 bool from_timeout)
00159 {
00160 if ( ec != boost::asio::error::operation_aborted )
00161 {
00162 delete running_timer;
00163
00164 if ( ec && from_timeout )
00165 handler->handle_send( ana::timeout_error , id(), op_id );
00166 else
00167 handler->handle_send( ec, id(), op_id );
00168
00169 if ( ec )
00170 disconnect();
00171 }
00172 }
00173