refactor: store outgoing handshake in `tr_peer_info` objects (#6103)

This commit is contained in:
Yat Ho 2023-10-15 01:58:15 +08:00 committed by GitHub
parent 8ef7eba3bd
commit 6cec60f1fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 154 additions and 122 deletions

View File

@ -138,6 +138,108 @@ bool tr_peer_info::is_blocklisted(tr_session const* session) const
return value;
}
void tr_peer_info::merge(tr_peer_info& that) noexcept
{
TR_ASSERT(is_connectable_.value_or(true) || !is_connected());
TR_ASSERT(that.is_connectable_.value_or(true) || !that.is_connected());
connection_attempted_at_ = std::max(connection_attempted_at_, that.connection_attempted_at_);
connection_changed_at_ = std::max(connection_changed_at_, that.connection_changed_at_);
piece_data_at_ = std::max(piece_data_at_, that.piece_data_at_);
/* no need to merge blocklist since it gets updated elsewhere */
{
// This part is frankly convoluted and confusing, but the idea is:
// 1. If the two peer info objects agree that this peer is connectable/non-connectable,
// then the answer is straightforward: We keep the agreed value.
// 2. If the two peer info objects disagrees as to whether this peer is connectable,
// then we reset the flag to an empty value, so that we can try for ourselves when
// initiating outgoing connections.
// 3. If one object has knowledge and the other doesn't, then we take the word of the
// peer info object with knowledge with one exception:
// - If the object with knowledge says the peer is not connectable, but we are
// currently connected to the peer, then we give it the benefit of the doubt.
// The connectable flag will be reset to an empty value.
// 4. In case both objects have no knowledge about whether this peer is connectable,
// we shall not make any assumptions: We keep the flag empty.
//
// Truth table:
// +-----------------+---------------+----------------------+--------------------+---------+
// | is_connectable_ | is_connected_ | that.is_connectable_ | that.is_connected_ | Result |
// +=================+===============+======================+====================+=========+
// | T | T | T | T | T |
// | T | T | T | F | T |
// | T | T | F | F | ? |
// | T | T | ? | T | T |
// | T | T | ? | F | T |
// | T | F | T | T | T |
// | T | F | T | F | T |
// | T | F | F | F | ? |
// | T | F | ? | T | T |
// | T | F | ? | F | T |
// | F | F | T | T | ? |
// | F | F | T | F | ? |
// | F | F | F | F | F |
// | F | F | ? | T | ? |
// | F | F | ? | F | F |
// | ? | T | T | T | T |
// | ? | T | T | F | T |
// | ? | T | F | F | ? |
// | ? | T | ? | T | ? |
// | ? | T | ? | F | ? |
// | ? | F | T | T | T |
// | ? | F | T | F | T |
// | ? | F | F | F | F |
// | ? | F | ? | T | ? |
// | ? | F | ? | F | ? |
// | N/A | N/A | F | T | Invalid |
// | F | T | N/A | N/A | Invalid |
// +-----------------+---------------+----------------------+--------------------+---------+
auto const conn_this = is_connectable_ && *is_connectable_;
auto const conn_that = that.is_connectable_ && *that.is_connectable_;
if ((!is_connectable_ && !that.is_connectable_) ||
is_connectable_.value_or(conn_that || is_connected()) !=
that.is_connectable_.value_or(conn_this || that.is_connected()))
{
is_connectable_.reset();
}
else
{
set_connectable(conn_this || conn_that);
}
}
set_utp_supported(supports_utp() || that.supports_utp());
/* from_first_ should never be modified */
found_at(that.from_best());
/* num_consecutive_fails_ is already the latest */
pex_flags_ |= that.pex_flags_;
if (that.is_banned())
{
ban();
}
/* is_connected_ should already be set */
set_seed(is_seed() || that.is_seed());
if (that.outgoing_handshake_)
{
if (outgoing_handshake_)
{
that.destroy_handshake();
}
else
{
outgoing_handshake_ = std::move(that.outgoing_handshake_);
}
}
}
#define tr_logAddDebugSwarm(swarm, msg) tr_logAddDebugTor((swarm)->tor, msg)
#define tr_logAddTraceSwarm(swarm, msg) tr_logAddTraceTor((swarm)->tor, msg)
@ -214,12 +316,9 @@ public:
{
auto const lock = unique_lock();
TR_ASSERT(!is_running);
TR_ASSERT(std::empty(outgoing_handshakes));
TR_ASSERT(std::empty(peers));
}
[[nodiscard]] bool peer_is_in_use(tr_peer_info const& peer_info) const;
void cancelOldRequests()
{
auto const now = tr_time();
@ -264,7 +363,10 @@ public:
is_running = false;
remove_all_peers();
outgoing_handshakes.clear();
for (auto& [sockaddr, peer_info] : connectable_pool)
{
peer_info.destroy_handshake();
}
}
void remove_peer(tr_peerMsgs* peer)
@ -494,8 +596,6 @@ public:
}
}
Handshakes outgoing_handshakes;
mutable tr_swarm_stats stats = {};
uint8_t optimistic_unchoke_time_scaler = 0;
@ -1082,35 +1182,34 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_
auto const& socket_address = result.io->socket_address();
auto* const info_ptr = s != nullptr ? s->get_existing_peer_info(socket_address) : nullptr;
if (result.io->is_incoming())
{
manager->incoming_handshakes.erase(socket_address);
}
else if (s != nullptr)
else if (info_ptr != nullptr)
{
s->outgoing_handshakes.erase(socket_address);
info_ptr->destroy_handshake();
}
auto const lock = manager->unique_lock();
if (!ok || s == nullptr || !s->is_running)
{
if (s != nullptr)
if (info_ptr != nullptr && !info_ptr->is_connected())
{
if (auto* const info = s->get_existing_peer_info(socket_address); info != nullptr && !info->is_connected())
{
info->on_connection_failed();
info_ptr->on_connection_failed();
if (!result.read_anything_from_peer)
{
tr_logAddTraceSwarm(
s,
fmt::format(
"marking peer {} as unreachable... num_fails is {}",
info->display_name(),
info->connection_failure_count()));
info->set_connectable(false);
}
if (!result.read_anything_from_peer)
{
tr_logAddTraceSwarm(
s,
fmt::format(
"marking peer {} as unreachable... num_fails is {}",
info_ptr->display_name(),
info_ptr->connection_failure_count()));
info_ptr->set_connectable(false);
}
}
}
@ -1118,7 +1217,7 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_
{
// If this is an outgoing connection, then we are sure we already have the peer info object
auto& info = result.io->is_incoming() ? s->ensure_info_exists(socket_address, 0U, TR_PEER_FROM_INCOMING, false) :
*s->get_existing_peer_info(socket_address);
*info_ptr;
if (!result.io->is_incoming())
{
@ -1277,7 +1376,7 @@ namespace get_peers_helpers
return false;
}
if (tor->swarm->peer_is_in_use(info))
if (info.is_in_use())
{
return true;
}
@ -2157,12 +2256,6 @@ void tr_peerMgr::bandwidthPulse()
// ---
bool tr_swarm::peer_is_in_use(tr_peer_info const& peer_info) const
{
// TODO(tearfur): maybe it's possible to store each handshake in the peer_info objects
return peer_info.is_connected() || outgoing_handshakes.count(peer_info.listen_socket_address()) != 0U;
}
namespace
{
namespace connect_helpers
@ -2183,7 +2276,7 @@ namespace connect_helpers
}
// not if we've already got a connection to them...
if (tor->swarm->peer_is_in_use(peer_info))
if (peer_info.is_in_use())
{
return false;
}
@ -2402,8 +2495,7 @@ void initiate_connection(tr_peerMgr* mgr, tr_swarm* s, tr_peer_info& peer_info)
}
else
{
s->outgoing_handshakes.try_emplace(
peer_info.listen_socket_address(),
peer_info.start_handshake(
&mgr->handshake_mediator_,
peer_io,
session->encryptionMode(),

View File

@ -19,6 +19,7 @@
#include "libtransmission/transmission.h" // tr_block_span_t (ptr only)
#include "libtransmission/handshake.h"
#include "libtransmission/net.h" /* tr_address */
#include "libtransmission/tr-assert.h"
#include "libtransmission/utils.h" /* tr_compare_3way */
@ -215,6 +216,31 @@ public:
return is_connected_;
}
[[nodiscard]] auto has_handshake() const noexcept
{
return static_cast<bool>(outgoing_handshake_);
}
template<typename... Args>
void start_handshake(Args&&... args)
{
TR_ASSERT(!outgoing_handshake_);
if (!outgoing_handshake_)
{
outgoing_handshake_ = std::make_unique<tr_handshake>(std::forward<Args>(args)...);
}
}
void destroy_handshake() noexcept
{
outgoing_handshake_.reset();
}
[[nodiscard]] auto is_in_use() const noexcept
{
return is_connected() || has_handshake();
}
// ---
[[nodiscard]] bool is_blocklisted(tr_session const* session) const;
@ -347,95 +373,7 @@ public:
// ---
// merge two peer info objects that supposedly describes the same peer
void merge(tr_peer_info const& that) noexcept
{
TR_ASSERT(is_connectable_.value_or(true) || !is_connected());
TR_ASSERT(that.is_connectable_.value_or(true) || !that.is_connected());
connection_attempted_at_ = std::max(connection_attempted_at_, that.connection_attempted_at_);
connection_changed_at_ = std::max(connection_changed_at_, that.connection_changed_at_);
piece_data_at_ = std::max(piece_data_at_, that.piece_data_at_);
/* no need to merge blocklist since it gets updated elsewhere */
{
// This part is frankly convoluted and confusing, but the idea is:
// 1. If the two peer info objects agree that this peer is connectable/non-connectable,
// then the answer is straightforward: We keep the agreed value.
// 2. If the two peer info objects disagrees as to whether this peer is connectable,
// then we reset the flag to an empty value, so that we can try for ourselves when
// initiating outgoing connections.
// 3. If one object has knowledge and the other doesn't, then we take the word of the
// peer info object with knowledge with one exception:
// - If the object with knowledge says the peer is not connectable, but we are
// currently connected to the peer, then we give it the benefit of the doubt.
// The connectable flag will be reset to an empty value.
// 4. In case both objects have no knowledge about whether this peer is connectable,
// we shall not make any assumptions: We keep the flag empty.
//
// Truth table:
// +-----------------+---------------+----------------------+--------------------+---------+
// | is_connectable_ | is_connected_ | that.is_connectable_ | that.is_connected_ | Result |
// +=================+===============+======================+====================+=========+
// | T | T | T | T | T |
// | T | T | T | F | T |
// | T | T | F | F | ? |
// | T | T | ? | T | T |
// | T | T | ? | F | T |
// | T | F | T | T | T |
// | T | F | T | F | T |
// | T | F | F | F | ? |
// | T | F | ? | T | T |
// | T | F | ? | F | T |
// | F | F | T | T | ? |
// | F | F | T | F | ? |
// | F | F | F | F | F |
// | F | F | ? | T | ? |
// | F | F | ? | F | F |
// | ? | T | T | T | T |
// | ? | T | T | F | T |
// | ? | T | F | F | ? |
// | ? | T | ? | T | ? |
// | ? | T | ? | F | ? |
// | ? | F | T | T | T |
// | ? | F | T | F | T |
// | ? | F | F | F | F |
// | ? | F | ? | T | ? |
// | ? | F | ? | F | ? |
// | N/A | N/A | F | T | Invalid |
// | F | T | N/A | N/A | Invalid |
// +-----------------+---------------+----------------------+--------------------+---------+
auto const conn_this = is_connectable_ && *is_connectable_;
auto const conn_that = that.is_connectable_ && *that.is_connectable_;
if ((!is_connectable_ && !that.is_connectable_) ||
is_connectable_.value_or(conn_that || is_connected()) !=
that.is_connectable_.value_or(conn_this || that.is_connected()))
{
is_connectable_.reset();
}
else
{
set_connectable(conn_this || conn_that);
}
}
set_utp_supported(supports_utp() || that.supports_utp());
/* from_first_ should never be modified */
found_at(that.from_best());
/* num_consecutive_fails_ is already the latest */
pex_flags_ |= that.pex_flags_;
if (that.is_banned())
{
ban();
}
/* is_connected_ should already be set */
set_seed(is_seed() || that.is_seed());
}
void merge(tr_peer_info& that) noexcept;
private:
[[nodiscard]] constexpr time_t get_reconnect_interval_secs(time_t const now) const noexcept
@ -502,6 +440,8 @@ private:
bool is_banned_ = false;
bool is_connected_ = false;
bool is_seed_ = false;
std::unique_ptr<tr_handshake> outgoing_handshake_;
};
struct tr_pex