00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include <deque>
00017 #include <boost/bind.hpp>
00018 #include <boost/ref.hpp>
00019 #include <boost/cstdint.hpp>
00020 #include <boost/version.hpp>
00021 #include "log.hpp"
00022 #include "network_asio.hpp"
00023 #include "serialization/parser.hpp"
00024
00025 static lg::log_domain log_network("network");
00026 #define DBG_NW LOG_STREAM(debug, log_network)
00027 #define LOG_NW LOG_STREAM(info, log_network)
00028 #define WRN_NW LOG_STREAM(warn, log_network)
00029 #define ERR_NW LOG_STREAM(err, log_network)
00030
00031 namespace network_asio {
00032
00033 using boost::system::system_error;
00034
00035 connection::connection(const std::string& host, const std::string& service)
00036 : io_service_()
00037 , resolver_(io_service_)
00038 , socket_(io_service_)
00039 , done_(false)
00040 , write_buf_()
00041 , read_buf_()
00042 , handshake_response_()
00043 , bytes_to_write_(0)
00044 , bytes_written_(0)
00045 , bytes_to_read_(0)
00046 , bytes_read_(0)
00047 {
00048 resolver_.async_resolve(
00049 boost::asio::ip::tcp::resolver::query(host, service),
00050 boost::bind(&connection::handle_resolve, this, _1, _2)
00051 );
00052 LOG_NW << "Resolving hostname: " << host << '\n';
00053 }
00054
00055 void connection::handle_resolve(
00056 const boost::system::error_code& ec,
00057 resolver::iterator iterator
00058 )
00059 {
00060 if(ec)
00061 throw system_error(ec);
00062
00063 connect(iterator);
00064 }
00065
00066 void connection::connect(resolver::iterator iterator)
00067 {
00068 socket_.async_connect(*iterator, boost::bind(
00069 &connection::handle_connect, this, _1, iterator)
00070 );
00071 LOG_NW << "Connecting to " << iterator->endpoint().address() << '\n';
00072 }
00073
00074 void connection::handle_connect(
00075 const boost::system::error_code& ec,
00076 resolver::iterator iterator
00077 )
00078 {
00079 if(ec) {
00080 WRN_NW << "Failed to connect to " <<
00081 iterator->endpoint().address() << ": " <<
00082 ec.message() << '\n';
00083 socket_.close();
00084 if(++iterator == resolver::iterator()) {
00085 ERR_NW << "Tried all IPs. Giving up\n";
00086 throw system_error(ec);
00087 } else
00088 connect(iterator);
00089 } else {
00090 LOG_NW << "Connected to " << iterator->endpoint().address() << '\n';
00091 handshake();
00092 }
00093 }
00094
00095 void connection::handshake()
00096 {
00097 static const boost::uint32_t handshake = 0;
00098 boost::asio::async_write(socket_,
00099 boost::asio::buffer(reinterpret_cast<const char*>(&handshake), 4),
00100 boost::bind(&connection::handle_write, this, _1, _2)
00101 );
00102 boost::asio::async_read(socket_,
00103 boost::asio::buffer(&handshake_response_.binary, 4),
00104 boost::bind(&connection::handle_handshake, this, _1)
00105 );
00106 }
00107
00108 void connection::handle_handshake(
00109 const boost::system::error_code& ec
00110 )
00111 {
00112 if(ec)
00113 throw system_error(ec);
00114 done_ = true;
00115 }
00116
00117 void connection::transfer(const config& request, config& response)
00118 {
00119 io_service_.reset();
00120 done_ = false;
00121
00122 std::ostream os(&write_buf_);
00123 write_gz(os, request);
00124 bytes_to_write_ = write_buf_.size() + 4;
00125 bytes_written_ = 0;
00126 payload_size_ = htonl(bytes_to_write_ - 4);
00127 boost::asio::streambuf::const_buffers_type gzipped_data = write_buf_.data();
00128 std::deque<boost::asio::const_buffer> bufs(gzipped_data.begin(), gzipped_data.end());
00129 bufs.push_front(boost::asio::buffer(reinterpret_cast<const char*>(&payload_size_), 4));
00130 boost::asio::async_write(socket_, bufs,
00131 boost::bind(&connection::is_write_complete, this, _1, _2),
00132 boost::bind(&connection::handle_write, this, _1, _2)
00133 );
00134 boost::asio::async_read(socket_, read_buf_,
00135 boost::bind(&connection::is_read_complete, this, _1, _2),
00136 boost::bind(&connection::handle_read, this, _1, _2, boost::ref(response))
00137 );
00138 }
00139
00140 void connection::cancel()
00141 {
00142 if(socket_.is_open()) {
00143 boost::system::error_code ec;
00144 socket_.cancel(ec);
00145 if(ec) {
00146 WRN_NW << "Failed to cancel network operations: " << ec.message() << "\n";
00147 }
00148 }
00149 }
00150
00151 std::size_t connection::is_write_complete(
00152 const boost::system::error_code& ec,
00153 std::size_t bytes_transferred
00154 )
00155 {
00156 if(ec)
00157 throw system_error(ec);
00158 bytes_written_ = bytes_transferred;
00159 #if BOOST_VERSION >= 103700
00160 return bytes_to_write_ - bytes_transferred;
00161 #else
00162 return bytes_to_write_ == bytes_transferred;
00163 #endif
00164 }
00165
00166 void connection::handle_write(
00167 const boost::system::error_code& ec,
00168 std::size_t bytes_transferred
00169 )
00170 {
00171 DBG_NW << "Written " << bytes_transferred << " bytes.\n";
00172 write_buf_.consume(bytes_transferred);
00173 if(ec)
00174 throw system_error(ec);
00175 }
00176
00177 std::size_t connection::is_read_complete(
00178 const boost::system::error_code& ec,
00179 std::size_t bytes_transferred
00180 )
00181 {
00182 if(ec)
00183 throw system_error(ec);
00184 bytes_read_ = bytes_transferred;
00185 if(bytes_transferred < 4) {
00186 return 4;
00187 } else {
00188 if(!bytes_to_read_) {
00189 std::istream is(&read_buf_);
00190 union { char binary[4]; boost::uint32_t num; } data_size;
00191 is.read(data_size.binary, 4);
00192 bytes_to_read_ = ntohl(data_size.num) + 4;
00193 }
00194 #if BOOST_VERSION >= 103700
00195 return bytes_to_read_ - bytes_transferred;
00196 #else
00197 return bytes_to_read_ == bytes_transferred;
00198 #endif
00199 }
00200 }
00201
00202 void connection::handle_read(
00203 const boost::system::error_code& ec,
00204 std::size_t bytes_transferred,
00205 config& response
00206 )
00207 {
00208 DBG_NW << "Read " << bytes_transferred << " bytes.\n";
00209 bytes_to_read_ = 0;
00210 bytes_to_write_ = 0;
00211 done_ = true;
00212 if(ec && ec != boost::asio::error::eof)
00213 throw system_error(ec);
00214 std::istream is(&read_buf_);
00215 read_gz(response, is);
00216 }
00217
00218 }