The Battle for Wesnoth  1.17.12+dev
wesnothd_connection.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2011 - 2022
3  by Sergey Popov <loonycyborg@gmail.com>
4  Part of the Battle for Wesnoth Project https://www.wesnoth.org/
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY.
12 
13  See the COPYING file for more details.
14 */
15 
16 #define BOOST_ASIO_NO_DEPRECATED
17 
18 #include "wesnothd_connection.hpp"
19 
20 #include "gettext.hpp"
22 #include "log.hpp"
23 #include "preferences/general.hpp"
24 #include "serialization/parser.hpp"
25 #include "tls_root_store.hpp"
26 
27 #include <boost/asio/connect.hpp>
28 #include <boost/asio/read.hpp>
29 #include <boost/asio/write.hpp>
30 
31 #include <cstdint>
32 #include <deque>
33 #include <functional>
34 
35 static lg::log_domain log_network("network");
36 #define DBG_NW LOG_STREAM(debug, log_network)
37 #define LOG_NW LOG_STREAM(info, log_network)
38 #define WRN_NW LOG_STREAM(warn, log_network)
39 #define ERR_NW LOG_STREAM(err, log_network)
40 
41 #if 0
42 // code for the travis test
43 #include <sys/types.h>
44 #include <unistd.h>
45 namespace {
46 struct mptest_log
47 {
48  mptest_log(const char* functionname)
49  {
50  WRN_NW << "Process:" << getpid() << " Thread:" << std::this_thread::get_id() << " Function: " << functionname << " Start";
51  }
52 };
53 }
54 #define MPTEST_LOG mptest_log mptest_log__(__func__)
55 #else
56 #define MPTEST_LOG ((void)0)
57 #endif
58 
59 using boost::system::error_code;
60 using boost::system::system_error;
61 
62 using namespace std::chrono_literals; // s, ms, etc
63 
64 // main thread
65 wesnothd_connection::wesnothd_connection(const std::string& host, const std::string& service)
66  : worker_thread_()
67  , io_context_()
68  , resolver_(io_context_)
69  , tls_context_(boost::asio::ssl::context::sslv23)
70  , host_(host)
71  , service_(service)
72  , use_tls_(true)
73  , socket_(raw_socket{ new raw_socket::element_type{io_context_} })
74  , last_error_()
77  , read_buf_()
79  , recv_queue_()
82  , payload_size_(0)
83  , bytes_to_write_(0)
84  , bytes_written_(0)
85  , bytes_to_read_(0)
86  , bytes_read_(0)
87 {
88  MPTEST_LOG;
89 
90  error_code ec;
91  auto result = resolver_.resolve(host, service, boost::asio::ip::resolver_query_base::numeric_host, ec);
92  if(!ec) { // if numeric resolve succeeds then we got raw ip address so TLS host name validation would never pass
93  use_tls_ = false;
94  boost::asio::post(io_context_, [this, ec, result](){ handle_resolve(ec, { result } ); } );
95  } else {
96  resolver_.async_resolve(host, service,
97  std::bind(&wesnothd_connection::handle_resolve, this, std::placeholders::_1, std::placeholders::_2));
98  }
99 
100  // Starts the worker thread. Do this *after* the above async_resolve call or it will just exit immediately!
101  worker_thread_ = std::thread([this]() {
102  try {
103  io_context_.run();
104  } catch(const boost::system::system_error&) {
105  try {
106  // Attempt to pass the exception on to the handshake promise.
107  handshake_finished_.set_exception(std::current_exception());
108  } catch(const std::future_error&) {
109  // Handshake already complete. Do nothing.
110  }
111  } catch(...) {
112  DBG_NW << "wesnothd_connection worker thread threw general exception: " << utils::get_unknown_exception_type();
113  }
114 
115  LOG_NW << "wesnothd_connection::io_service::run() returned";
116  });
117 
118  LOG_NW << "Resolving hostname: " << host;
119 }
120 
122 {
123  MPTEST_LOG;
124 
125  if(auto socket = utils::get_if<tls_socket>(&socket_)) {
126  error_code ec;
127  // this sends close_notify for secure connection shutdown
128  (*socket)->async_shutdown([](const error_code&) {} );
129  const char buffer[] = "";
130  // this write is needed to trigger immediate close instead of waiting for other side's close_notify
131  boost::asio::write(**socket, boost::asio::buffer(buffer, 0), ec);
132  }
133  // Stop the io_service and wait for the worker thread to terminate.
134  stop();
135  worker_thread_.join();
136 }
137 
138 // worker thread
139 void wesnothd_connection::handle_resolve(const error_code& ec, results_type results)
140 {
141  MPTEST_LOG;
142  if(ec) {
143  LOG_NW << __func__ << " Throwing: " << ec;
144  throw system_error(ec);
145  }
146 
147  boost::asio::async_connect(*utils::get<raw_socket>(socket_), results,
148  std::bind(&wesnothd_connection::handle_connect, this, std::placeholders::_1, std::placeholders::_2));
149 }
150 
151 // worker thread
152 void wesnothd_connection::handle_connect(const boost::system::error_code& ec, endpoint endpoint)
153 {
154  MPTEST_LOG;
155  if(ec) {
156  ERR_NW << "Tried all IPs. Giving up";
157  throw system_error(ec);
158  } else {
159  LOG_NW << "Connected to " << endpoint.address();
160 
161  if(endpoint.address().is_loopback()) {
162  use_tls_ = false;
163  }
164  handshake();
165  }
166 }
167 
168 // worker thread
170 {
171  MPTEST_LOG;
172 
173  DBG_NW << "Connecting with keepalive of: " << preferences::keepalive_timeout();
174  set_keepalive(preferences::keepalive_timeout());
175 
176  static const uint32_t handshake = 0;
177  static const uint32_t tls_handshake = htonl(uint32_t(1));
178 
179  boost::asio::async_write(*utils::get<raw_socket>(socket_), boost::asio::buffer(use_tls_ ? reinterpret_cast<const char*>(&tls_handshake) : reinterpret_cast<const char*>(&handshake), 4),
180  [](const error_code& ec, std::size_t) { if(ec) { throw system_error(ec); } });
181  boost::asio::async_read(*utils::get<raw_socket>(socket_), boost::asio::buffer(reinterpret_cast<std::byte*>(&handshake_response_), 4),
182  std::bind(&wesnothd_connection::handle_handshake, this, std::placeholders::_1));
183 }
184 
185 template<typename Verifier> auto verbose_verify(Verifier&& verifier)
186 {
187  return [verifier](bool preverified, boost::asio::ssl::verify_context& ctx) {
188  char subject_name[256];
189  X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
190  X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
191  bool verified = verifier(preverified, ctx);
192  DBG_NW << "Verifying TLS certificate: " << subject_name << ": " <<
193  (verified ? "verified" : "failed");
194  BIO* bio = BIO_new(BIO_s_mem());
195  char buffer[1024];
196  X509_print(bio, cert);
197  while(BIO_read(bio, buffer, 1024) > 0)
198  {
199  DBG_NW << buffer;
200  }
201  BIO_free(bio);
202  return verified;
203  };
204 }
205 
206 // worker thread
207 void wesnothd_connection::handle_handshake(const error_code& ec)
208 {
209  MPTEST_LOG;
210  if(ec) {
211  if(ec == boost::asio::error::eof && use_tls_) {
212  // immediate disconnect likely means old server not supporting TLS handshake code
214  return;
215  }
216  LOG_NW << __func__ << " Throwing: " << ec;
217  throw system_error(ec);
218  }
219 
220  if(use_tls_) {
221  if(handshake_response_ == 0xFFFFFFFFU) {
222  use_tls_ = false;
223  handle_handshake(ec);
224  return;
225  }
226 
227  if(handshake_response_ == 0x00000000) {
229  raw_socket s { std::move(utils::get<raw_socket>(socket_)) };
230  tls_socket ts { new tls_socket::element_type{std::move(*s), tls_context_} };
231  socket_ = std::move(ts);
232 
233  auto& socket { *utils::get<tls_socket>(socket_) };
234 
235  socket.set_verify_mode(
236  boost::asio::ssl::verify_peer |
237  boost::asio::ssl::verify_fail_if_no_peer_cert
238  );
239 
240 #if BOOST_VERSION >= 107300
241  socket.set_verify_callback(verbose_verify(boost::asio::ssl::host_name_verification(host_)));
242 #else
243  socket.set_verify_callback(verbose_verify(boost::asio::ssl::rfc2818_verification(host_)));
244 #endif
245 
246  socket.async_handshake(boost::asio::ssl::stream_base::client, [this](const error_code& ec) {
247  if(ec) {
248  LOG_NW << __func__ << " Throwing: " << ec;
249  throw system_error(ec);
250  }
251 
252  handshake_finished_.set_value();
253  recv();
254  });
255  return;
256  }
257 
259  } else {
260  handshake_finished_.set_value();
261  recv();
262  }
263 }
264 
265 // worker thread
267 {
268  assert(use_tls_ == true);
269  use_tls_ = false;
270 
271  boost::asio::ip::tcp::endpoint endpoint { utils::get<raw_socket>(socket_)->remote_endpoint() };
272  utils::get<raw_socket>(socket_)->close();
273 
274  utils::get<raw_socket>(socket_)->async_connect(endpoint,
275  std::bind(&wesnothd_connection::handle_connect, this, std::placeholders::_1, endpoint));
276 }
277 
278 // main thread
280 {
281  MPTEST_LOG;
282  LOG_NW << "Waiting for handshake";
283 
284  try {
285  // TODO: make this duration customizable. Should default to 1 minute.
286  auto timeout = 60s;
287 
288  auto future = handshake_finished_.get_future();
289  for(auto time = 0ms;
290  future.wait_for(10ms) == std::future_status::timeout
291  && time < timeout;
292  time += 10ms)
293  {
295  }
296 
297  switch(future.wait_for(0ms)) {
298  case std::future_status::ready:
299  // This is a void future, so this just serves to re-throw any system_error exceptions
300  // stored by the worker thread. Additional handling occurs in the catch block below.
301  future.get();
302  break;
303  case std::future_status::timeout:
304  throw error(boost::asio::error::make_error_code(boost::asio::error::timed_out));
305  default:
306  break;
307  }
308  } catch(const boost::system::system_error& err) {
309  if(err.code() == boost::asio::error::operation_aborted || err.code() == boost::asio::error::eof) {
310  return;
311  }
312 
313  WRN_NW << __func__ << " Rethrowing: " << err.code();
314  throw error(err.code());
315  } catch(const std::future_error& e) {
316  if(e.code() == std::future_errc::future_already_retrieved) {
317  return;
318  }
319  }
320 }
321 
322 // main thread
324 {
325  MPTEST_LOG;
326 
327  auto buf_ptr = std::make_unique<boost::asio::streambuf>();
328 
329  std::ostream os(buf_ptr.get());
330  write_gz(os, request);
331 
332  boost::asio::post(io_context_, [this, buf_ptr = std::move(buf_ptr)]() mutable {
333 
334  DBG_NW << "In wesnothd_connection::send_data::lambda";
335  send_queue_.push(std::move(buf_ptr));
336 
337  if(send_queue_.size() == 1) {
338  send();
339  }
340  });
341 }
342 
343 // main thread
345 {
346  MPTEST_LOG;
347  utils::visit([](auto&& socket) {
348  if(socket->lowest_layer().is_open()) {
349  boost::system::error_code ec;
350 
351 #ifdef _MSC_VER
352 // Silence warning about boost::asio::basic_socket<Protocol>::cancel always
353 // returning an error on XP, which we don't support anymore.
354 #pragma warning(push)
355 #pragma warning(disable:4996)
356 #endif
357  socket->lowest_layer().cancel(ec);
358 #ifdef _MSC_VER
359 #pragma warning(pop)
360 #endif
361 
362  if(ec) {
363  WRN_NW << "Failed to cancel network operations: " << ec.message();
364  }
365  }
366  }, socket_);
367 }
368 
369 // main thread
371 {
372  // TODO: wouldn't cancel() have the same effect?
373  MPTEST_LOG;
374  io_context_.stop();
375 }
376 
377 // worker thread
378 std::size_t wesnothd_connection::is_write_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
379 {
380  MPTEST_LOG;
381  if(ec) {
382  {
383  std::scoped_lock lock(last_error_mutex_);
384  last_error_ = ec;
385  }
386 
387  LOG_NW << __func__ << " Error: " << ec;
388 
389  io_context_.stop();
390  return bytes_to_write_ - bytes_transferred;
391  }
392 
393  bytes_written_ = bytes_transferred;
394  return bytes_to_write_ - bytes_transferred;
395 }
396 
397 // worker thread
398 void wesnothd_connection::handle_write(const boost::system::error_code& ec, std::size_t bytes_transferred)
399 {
400  MPTEST_LOG;
401  DBG_NW << "Written " << bytes_transferred << " bytes.";
402 
403  send_queue_.pop();
404 
405  if(ec) {
406  {
407  std::scoped_lock lock(last_error_mutex_);
408  last_error_ = ec;
409  }
410 
411  LOG_NW << __func__ << " Error: " << ec;
412 
413  io_context_.stop();
414  return;
415  }
416 
417  if(!send_queue_.empty()) {
418  send();
419  }
420 }
421 
422 // worker thread
423 std::size_t wesnothd_connection::is_read_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
424 {
425  // We use custom is_write/read_complete function to be able to see the current progress of the upload/download
426  MPTEST_LOG;
427  if(ec) {
428  {
429  std::scoped_lock lock(last_error_mutex_);
430  last_error_ = ec;
431  }
432 
433  LOG_NW << __func__ << " Error: " << ec;
434 
435  io_context_.stop();
436  return bytes_to_read_ - bytes_transferred;
437  }
438 
439  bytes_read_ = bytes_transferred;
440 
441  if(bytes_transferred < 4) {
442  return 4;
443  }
444 
445  if(!bytes_to_read_) {
446  std::istream is(&read_buf_);
447  uint32_t data_size;
448 
449  is.read(reinterpret_cast<char*>(&data_size), 4);
450  bytes_to_read_ = ntohl(data_size) + 4;
451 
452  // Close immediately if we receive an invalid length
453  if(bytes_to_read_ < 4) {
454  bytes_to_read_ = bytes_transferred;
455  }
456  }
457 
458  return bytes_to_read_ - bytes_transferred;
459 }
460 
461 // worker thread
462 void wesnothd_connection::handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
463 {
464  MPTEST_LOG;
465  DBG_NW << "Read " << bytes_transferred << " bytes.";
466 
467  bytes_to_read_ = 0;
468  if(last_error_ && ec != boost::asio::error::eof) {
469  {
470  std::scoped_lock lock(last_error_mutex_);
471  last_error_ = ec;
472  }
473 
474  LOG_NW << __func__ << " Error: " << ec;
475 
476  io_context_.stop();
477  return;
478  }
479 
480  std::istream is(&read_buf_);
481  config data;
482  read_gz(data, is);
483  if(!data.empty()) { DBG_NW << "Received:\n" << data; }
484 
485  {
486  std::scoped_lock lock(recv_queue_mutex_);
487  recv_queue_.emplace(std::move(data));
488  recv_queue_lock_.notify_all();
489  }
490 
491  recv();
492 }
493 
494 // worker thread
496 {
497  MPTEST_LOG;
498  auto& buf = *send_queue_.front();
499 
500  std::size_t buf_size = buf.size();
501  bytes_to_write_ = buf_size + 4;
502  bytes_written_ = 0;
503  payload_size_ = htonl(buf_size);
504 
505  std::deque<boost::asio::const_buffer> bufs {
506  boost::asio::buffer(reinterpret_cast<const char*>(&payload_size_), 4),
507  buf.data()
508  };
509 
510  utils::visit([this, &bufs](auto&& socket) {
511  boost::asio::async_write(*socket, bufs,
512  std::bind(&wesnothd_connection::is_write_complete, this, std::placeholders::_1, std::placeholders::_2),
513  std::bind(&wesnothd_connection::handle_write, this, std::placeholders::_1, std::placeholders::_2));
514  }, socket_);
515 }
516 
517 // worker thread
519 {
520  MPTEST_LOG;
521 
522  utils::visit([this](auto&& socket) {
523  boost::asio::async_read(*socket, read_buf_,
524  std::bind(&wesnothd_connection::is_read_complete, this, std::placeholders::_1, std::placeholders::_2),
525  std::bind(&wesnothd_connection::handle_read, this, std::placeholders::_1, std::placeholders::_2));
526  }, socket_);
527 }
528 
529 // main thread
531 {
532  MPTEST_LOG;
533 
534  {
535  std::scoped_lock lock(recv_queue_mutex_);
536  if(!recv_queue_.empty()) {
537  result.swap(recv_queue_.front());
538  recv_queue_.pop();
539  return true;
540  }
541  }
542 
543  {
544  std::scoped_lock lock(last_error_mutex_);
545  if(last_error_) {
546  std::string user_msg;
547 
548  if(last_error_ == boost::asio::error::eof) {
549  user_msg = _("Disconnected from server.");
550  }
551 
552  throw error(last_error_, user_msg);
553  }
554  }
555 
556  return false;
557 }
558 
560 {
561  {
562  std::unique_lock<std::mutex> lock(recv_queue_mutex_);
563  while(!recv_queue_lock_.wait_for(
564  lock, 10ms, [this]() { return has_data_received(); }))
565  {
567  }
568  }
569 
570  return receive_data(data);
571 };
572 
574 {
575  boost::asio::socket_base::keep_alive option(true);
576  utils::get<raw_socket>(socket_)->set_option(option);
577 
578 #ifdef __linux__
579  int timeout = 10;
580  int cnt = std::max((seconds - 10) / 10, 1);
581  int interval = 10;
582  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_TCP, TCP_KEEPIDLE, &timeout, sizeof(timeout));
583  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_TCP, TCP_KEEPCNT, &cnt, sizeof(cnt));
584  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
585 #elif defined(__APPLE__) && defined(__MACH__)
586  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), IPPROTO_TCP, TCP_KEEPALIVE, &seconds, sizeof(seconds));
587 #elif defined(_WIN32)
588  // these are in milliseconds for windows
589  DWORD timeout_ms = seconds * 1000;
590  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&timeout_ms), sizeof(timeout_ms));
591  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&timeout_ms), sizeof(timeout_ms));
592 #endif
593 }
void send_data(const configr_of &request)
Queues the given data to be sent to the server.
std::size_t is_read_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
std::condition_variable recv_queue_lock_
std::promise< void > handshake_finished_
boost::asio::ssl::context tls_context_
void handle_write(const boost::system::error_code &ec, std::size_t bytes_transferred)
wesnothd_connection_error error
resolver::results_type results_type
boost::asio::io_context io_context_
std::string_view data
Definition: picture.cpp:206
std::size_t is_write_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
void handle_connect(const boost::system::error_code &ec, endpoint endpoint)
void load_tls_root_certs(boost::asio::ssl::context &ctx)
std::string get_unknown_exception_type()
Utility function for finding the type of thing caught with catch(...).
Definition: general.cpp:23
bool receive_data(config &result)
Receives the next pending data pack from the server, if available.
void read_gz(config &cfg, std::istream &file, abstract_validator *validator)
Might throw a std::ios_base::failure especially a gzip_error.
Definition: parser.cpp:683
static std::string _(const char *str)
Definition: gettext.hpp:93
#define ERR_NW
#define MPTEST_LOG
#define DBG_NW
std::unique_ptr< boost::asio::ssl::stream< raw_socket::element_type > > tls_socket
void write(std::ostream &out, const configr_of &cfg, unsigned int level)
Definition: parser.cpp:764
void swap(config &cfg)
Definition: config.cpp:1447
void write_gz(std::ostream &out, const configr_of &cfg)
Definition: parser.cpp:783
#define WRN_NW
void handle_handshake(const boost::system::error_code &ec)
static void spin()
Indicate to the player that loading is progressing.
boost::asio::streambuf read_buf_
boost::system::error_code last_error_
int keepalive_timeout()
Definition: general.cpp:505
logger & err()
Definition: log.cpp:170
static map_location::DIRECTION s
void set_keepalive(int seconds)
auto verbose_verify(Verifier &&verifier)
wesnothd_connection(const wesnothd_connection &)=delete
static lg::log_domain log_network("network")
data_queue< config > recv_queue_
void handle_read(const boost::system::error_code &ec, std::size_t bytes_transferred)
const boost::asio::ip::tcp::endpoint & endpoint
data_queue< std::unique_ptr< boost::asio::streambuf > > send_queue_
Standard logging facilities (interface).
void handle_resolve(const boost::system::error_code &ec, results_type results)
bool wait_and_receive_data(config &data)
Unlike receive_data, waits until data is available instead of returning immediately.
std::unique_ptr< boost::asio::ip::tcp::socket > raw_socket
#define e
A config object defines a single node in a WML file, with access to child nodes.
Definition: config.hpp:60
#define LOG_NW
bool empty() const
Definition: config.cpp:941
void wait_for_handshake()
Waits until the server handshake is complete.