00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <iostream>
00034
00035 #include <memory>
00036
00037 #include <boost/bind.hpp>
00038 #include <boost/thread.hpp>
00039
00040 #include "asio_client.hpp"
00041
00042 using boost::asio::ip::tcp;
00043
00044 asio_client::asio_client(ana::address address, ana::port pt) :
00045 asio_listener(),
00046 io_service_(),
00047 io_threads_(),
00048 work_( io_service_ ),
00049 socket_(io_service_),
00050 address_(address),
00051 port_(pt),
00052 connect_timeout_ms_( 0 ),
00053 proxy_( NULL ),
00054 use_proxy_( false ),
00055 stats_collector_( ),
00056 last_valid_operation_id_( ana::no_operation ),
00057 connection_informed_mutex_(),
00058 connection_informed_( false )
00059 {
00060 }
00061
00062 asio_client::~asio_client()
00063 {
00064 disconnect_listener();
00065
00066 std::list< boost::thread* >::iterator it;
00067
00068 it = io_threads_.begin();
00069
00070 while (it != io_threads_.end())
00071 {
00072 (*it)->join();
00073 it = io_threads_.erase( it );
00074 }
00075 }
00076
00077 ana::client* ana::client::create(ana::address address, ana::port pt)
00078 {
00079 return new asio_client(address, pt);
00080 }
00081
00082 void asio_client::run()
00083 {
00084 io_threads_.push_back(
00085 new boost::thread( boost::bind( &boost::asio::io_service::run, &io_service_) ) );
00086 }
00087
00088 void asio_client::handle_proxy_connection(const boost::system::error_code& ec,
00089 ana::connection_handler* handler,
00090 ana::timer* timer)
00091 {
00092 delete timer;
00093
00094 inform_connection_result( handler, ec);
00095
00096 if ( ( ! ec ) && ( ana::client::header_mode() ) )
00097 run_listener();
00098
00099 delete proxy_;
00100 }
00101
00102 tcp::socket& asio_client::socket()
00103 {
00104 return socket_;
00105 }
00106
00107 void asio_client::handle_connect(const boost::system::error_code& ec,
00108 tcp::resolver::iterator endpoint_iterator,
00109 ana::connection_handler* handler,
00110 ana::timer* timer)
00111 {
00112 if ( ! ec )
00113 {
00114 delete timer;
00115
00116 inform_connection_result( handler, ec);
00117
00118 if ( ana::client::header_mode() )
00119 run_listener();
00120 }
00121 else
00122 {
00123 if ( endpoint_iterator == tcp::resolver::iterator() )
00124 inform_connection_result( handler, ec);
00125 else
00126 {
00127
00128 socket_.close();
00129
00130 tcp::endpoint endpoint = *endpoint_iterator;
00131 socket_.async_connect(endpoint,
00132 boost::bind(&asio_client::handle_connect, this,
00133 boost::asio::placeholders::error, ++endpoint_iterator,
00134 handler, timer));
00135 }
00136 }
00137 }
00138
00139 void asio_client::inform_connection_result( ana::connection_handler* handler, ana::error_code ec)
00140 {
00141 boost::mutex::scoped_lock lock( connection_informed_mutex_ );
00142
00143 if( ! connection_informed_ )
00144 {
00145 connection_informed_ = true;
00146 handler->handle_connect( ec, 0 );
00147 }
00148 }
00149
00150 void asio_client::handle_timeout(const boost::system::error_code& ec,
00151 ana::connection_handler* handler,
00152 ana::timer* timer)
00153 {
00154 if ( ec != ana::operation_aborted )
00155 {
00156 delete timer;
00157
00158 inform_connection_result( handler, ec);
00159 cancel_pending();
00160 }
00161 }
00162
00163 ana::timer* asio_client::start_connection_timer(ana::connection_handler* handler)
00164 {
00165 if ( connect_timeout_ms_ == 0 )
00166 return NULL;
00167 else
00168 {
00169 ana::timer* running_timer( NULL );
00170
00171 ana::client* sender = static_cast<ana::client*>(this);
00172
00173 running_timer = sender->create_timer();
00174
00175 running_timer->wait( connect_timeout_ms_,
00176 boost::bind(&asio_client::handle_timeout, this,
00177 boost::asio::placeholders::error, handler,
00178 running_timer ) );
00179 return running_timer;
00180 }
00181 }
00182
00183 void asio_client::connect( ana::connection_handler* handler )
00184 {
00185 ana::timer* running_timer = start_connection_timer(handler);
00186
00187 try
00188 {
00189 tcp::resolver resolver(io_service_);
00190 tcp::resolver::query query(address_.c_str(), port_.c_str() );
00191 tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
00192
00193 tcp::endpoint endpoint = *endpoint_iterator;
00194 socket_.async_connect(endpoint,
00195 boost::bind(&asio_client::handle_connect, this,
00196 boost::asio::placeholders::error, ++endpoint_iterator,
00197 handler, running_timer));
00198 }
00199 catch (const std::exception&)
00200 {
00201 inform_connection_result( handler, ana::generic_error );
00202 }
00203 }
00204
00205 void asio_client::connect_through_proxy(std::string proxy_address,
00206 std::string proxy_port,
00207 ana::connection_handler* handler,
00208 std::string user_name,
00209 std::string password)
00210 {
00211 ana::timer* running_timer = start_connection_timer(handler);
00212
00213 use_proxy_ = true;
00214
00215 proxy_information proxy_info;
00216
00217 proxy_info.proxy_address = proxy_address;
00218 proxy_info.proxy_port = proxy_port;
00219 proxy_info.user_name = user_name;
00220 proxy_info.password = password;
00221
00222 proxy_ = new proxy_connection( socket_, proxy_info, address_, port_, running_timer);
00223
00224 proxy_->connect( this, handler );
00225 }
00226
00227 ana::operation_id asio_client::send(boost::asio::const_buffer buffer,
00228 ana::send_handler* handler,
00229 ana::send_type copy_buffer )
00230 {
00231 ana::detail::shared_buffer s_buf(new ana::detail::copying_buffer(buffer, copy_buffer ) );
00232
00233 asio_sender::send(s_buf,
00234 socket_,
00235 handler,
00236 static_cast<ana::client*>(this),
00237 ++last_valid_operation_id_ );
00238
00239 return last_valid_operation_id_;
00240 }
00241
00242 void asio_client::log_receive( ana::read_buffer buffer )
00243 {
00244 stats_collector_.log_receive( buffer );
00245 }
00246
00247 const ana::stats* asio_client::get_stats( ana::stat_type type ) const
00248 {
00249 return stats_collector_.get_stats( type );
00250 }
00251
00252 ana::stats_collector& asio_client::stats_collector()
00253 {
00254 return stats_collector_;
00255 }
00256
00257 void asio_client::cancel_pending()
00258 {
00259 socket_.cancel();
00260 }
00261
00262 void asio_client::set_connect_timeout( size_t ms )
00263 {
00264 connect_timeout_ms_ = ms;
00265 }
00266
00267 void asio_client::expecting_message( size_t ms_until_timeout )
00268 {
00269 wait_for_incoming_message( ms_until_timeout );
00270 }
00271
00272 std::string asio_client::ip_address() const
00273 {
00274 return socket_.remote_endpoint().address().to_string();
00275 }
00276
00277 void asio_client::disconnect_listener()
00278 {
00279 io_service_.stop();
00280 }
00281