refactor: replace `peer_atom` with `tr_peer_info` (#5773)

This commit is contained in:
Charles Kerr 2023-07-14 07:06:25 -05:00 committed by GitHub
parent 2ec19e694d
commit c867f00153
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 497 additions and 397 deletions

View File

@ -28,7 +28,7 @@
class tr_peer;
class tr_swarm;
struct peer_atom;
class tr_peer_info;
struct tr_bandwidth;
// --- Peer Publish / Subscribe
@ -175,13 +175,13 @@ using tr_peer_callback = void (*)(tr_peer* peer, tr_peer_event const& event, voi
/**
* State information about a connected peer.
*
* @see struct peer_atom
* @see tr_peer_info
* @see tr_peerMsgs
*/
class tr_peer
{
public:
tr_peer(tr_torrent const* tor, peer_atom* atom = nullptr);
tr_peer(tr_torrent const* tor, tr_peer_info* const peer_info = nullptr);
virtual ~tr_peer();
virtual bool isTransferringPieces(uint64_t now, tr_direction dir, tr_bytes_per_second_t* setme_bytes_per_second) const = 0;
@ -242,7 +242,7 @@ public:
/// TODO(ckerr): refactor them out of `tr_peer`
// hook to private peer-mgr information
peer_atom* const atom;
tr_peer_info* const peer_info;
// whether or not this peer sent us any given block
tr_bitfield blame;

View File

@ -5,7 +5,6 @@
#include <algorithm>
#include <array>
#include <atomic>
#include <cerrno> // error codes ERANGE, ...
#include <chrono>
#include <cmath>
@ -14,7 +13,6 @@
#include <ctime> // time_t
#include <iterator> // std::back_inserter
#include <map>
#include <memory>
#include <optional>
#include <tuple> // std::tie
#include <unordered_map>
@ -122,197 +120,17 @@ private:
tr_session& session_;
};
/**
* Peer information that should be kept even before we've connected and
* after we've disconnected. These are kept in a pool of peer_atoms to decide
* which ones would make good candidates for connecting to, and to watch out
* for banned peers.
*
* @see tr_peer
* @see tr_peerMsgs
*/
struct peer_atom
bool tr_peer_info::is_blocklisted(tr_session const* session) const
{
peer_atom(tr_socket_address const& socket_address_in, uint8_t flags_in, uint8_t from)
: socket_address{ socket_address_in }
, fromFirst{ from }
, fromBest{ from }
, flags{ flags_in }
if (blocklisted_)
{
++n_atoms;
return *blocklisted_;
}
peer_atom(peer_atom&&) = delete;
peer_atom(peer_atom const&) = delete;
peer_atom& operator=(peer_atom&&) = delete;
peer_atom& operator=(peer_atom const&) = delete;
~peer_atom()
{
[[maybe_unused]] auto const n_prev = n_atoms--;
TR_ASSERT(n_prev > 0U);
}
[[nodiscard]] static auto atom_count() noexcept
{
return n_atoms.load();
}
[[nodiscard]] constexpr auto isSeed() const noexcept
{
return (flags & ADDED_F_SEED_FLAG) != 0;
}
[[nodiscard]] constexpr auto const& addr() const noexcept
{
return socket_address.address();
}
[[nodiscard]] constexpr auto& port() noexcept
{
return socket_address.port_;
}
[[nodiscard]] constexpr auto port() const noexcept
{
return socket_address.port();
}
[[nodiscard]] auto display_name() const
{
return socket_address.display_name();
}
[[nodiscard]] bool isBlocklisted(tr_session const* session) const
{
if (blocklisted_)
{
return *blocklisted_;
}
auto const value = session->addressIsBlocked(addr());
blocklisted_ = value;
return value;
}
[[nodiscard]] constexpr auto is_unreachable() const noexcept
{
return is_unreachable_;
}
constexpr void set_unreachable() noexcept
{
is_unreachable_ = true;
}
constexpr void set_reachable() noexcept
{
is_unreachable_ = false;
}
[[nodiscard]] constexpr int getReconnectIntervalSecs(time_t const now) const noexcept
{
auto sec = int{};
auto const unreachable = is_unreachable();
/* if we were recently connected to this peer and transferring piece
* data, try to reconnect to them sooner rather that later -- we don't
* want network troubles to get in the way of a good peer. */
if (!unreachable && now - this->piece_data_time <= MinimumReconnectIntervalSecs * 2)
{
sec = MinimumReconnectIntervalSecs;
}
/* otherwise, the interval depends on how many times we've tried
* and failed to connect to the peer */
else
{
auto step = this->num_fails;
/* penalize peers that were unreachable the last time we tried */
if (unreachable)
{
step += 2;
}
switch (step)
{
case 0:
sec = 0;
break;
case 1:
sec = 10;
break;
case 2:
sec = 60 * 2;
break;
case 3:
sec = 60 * 15;
break;
case 4:
sec = 60 * 30;
break;
case 5:
sec = 60 * 60;
break;
default:
sec = 60 * 120;
break;
}
}
return sec;
}
void setBlocklistedDirty()
{
blocklisted_.reset();
}
[[nodiscard]] constexpr auto is_banned() const noexcept
{
return is_banned_;
}
constexpr void ban() noexcept
{
is_banned_ = true;
}
tr_socket_address socket_address;
uint16_t num_fails = {};
time_t time = {}; /* when the peer's connection status last changed */
time_t piece_data_time = {};
time_t lastConnectionAttemptAt = {};
time_t lastConnectionAt = {};
uint8_t const fromFirst; /* where the peer was first found */
uint8_t fromBest; /* the "best" value of where the peer has been found */
uint8_t flags = {}; /* these match the added_f flags */
uint8_t flags2 = {}; /* flags that aren't defined in added_f */
bool utp_failed = false; /* We recently failed to connect over µTP */
bool is_connected = false;
private:
// the minimum we'll wait before attempting to reconnect to a peer
static auto constexpr MinimumReconnectIntervalSecs = int{ 5 };
static auto inline n_atoms = std::atomic<size_t>{};
mutable std::optional<bool> blocklisted_;
bool is_banned_ = false;
bool is_unreachable_ = false; // we tried to connect & failed
};
auto const value = session->addressIsBlocked(addr());
blocklisted_ = value;
return value;
}
using Handshakes = std::unordered_map<tr_socket_address, tr_handshake>;
@ -359,7 +177,7 @@ public:
TR_ASSERT(std::empty(peers));
}
[[nodiscard]] bool peer_is_in_use(peer_atom const& atom) const;
[[nodiscard]] bool peer_is_in_use(tr_peer_info const& peer_info) const;
void cancelOldRequests()
{
@ -412,10 +230,8 @@ public:
{
auto const lock = unique_lock();
auto* const atom = peer->atom;
TR_ASSERT(atom != nullptr);
atom->time = tr_time();
auto* const peer_info = peer->peer_info;
TR_ASSERT(peer_info != nullptr);
if (auto iter = std::find(std::begin(peers), std::end(peers), peer); iter != std::end(peers))
{
@ -423,7 +239,7 @@ public:
}
--stats.peer_count;
--stats.peer_from_count[atom->fromFirst];
--stats.peer_from_count[peer_info->from_first()];
TR_ASSERT(stats.peer_count == peerCount());
@ -462,7 +278,7 @@ public:
if (++peer->strikes >= MaxBadPiecesPerPeer)
{
peer->atom->ban();
peer->peer_info->ban();
peer->do_purge = true;
tr_logAddTraceSwarm(this, fmt::format("banning peer {}", peer->display_name()));
}
@ -490,40 +306,40 @@ public:
pool_is_all_seeds_ = std::all_of(
std::begin(pool),
std::end(pool),
[](auto const& key_val) { return key_val.second.isSeed(); });
[](auto const& key_val) { return key_val.second.is_seed(); });
}
return *pool_is_all_seeds_;
}
[[nodiscard]] peer_atom* get_existing_atom(tr_socket_address const& socket_address) noexcept
[[nodiscard]] tr_peer_info* get_existing_peer_info(tr_socket_address const& socket_address) noexcept
{
auto&& it = pool.find(socket_address);
return it != pool.end() ? &it->second : nullptr;
}
peer_atom* ensure_atom_exists(tr_socket_address const& socket_address, uint8_t const flags, uint8_t const from)
tr_peer_info& ensure_info_exists(tr_socket_address const& socket_address, uint8_t const flags, tr_peer_from const from)
{
TR_ASSERT(socket_address.is_valid());
TR_ASSERT(from < TR_PEER_FROM__MAX);
auto&& [atom_it, is_new] = pool.try_emplace(socket_address, socket_address, flags, from);
peer_atom* atom = &atom_it->second;
auto&& [info_it, is_new] = pool.try_emplace(socket_address, socket_address, flags, from);
auto& peer_info = info_it->second;
if (!is_new)
{
atom->fromBest = std::min(atom->fromBest, from);
atom->flags |= flags;
peer_info.found_at(from);
peer_info.set_pex_flags(flags);
}
mark_all_seeds_flag_dirty();
return atom;
return peer_info;
}
void mark_atom_as_seed(peer_atom& atom)
void mark_peer_as_seed(tr_peer_info& peer_info)
{
tr_logAddTraceSwarm(this, fmt::format("marking peer {} as a seed", atom.display_name()));
atom.flags |= ADDED_F_SEED_FLAG;
tr_logAddTraceSwarm(this, fmt::format("marking peer {} as a seed", peer_info.display_name()));
peer_info.set_seed();
mark_all_seeds_flag_dirty();
}
@ -546,10 +362,10 @@ public:
tor->set_dirty();
tor->session->add_uploaded(event.length);
if (peer->atom != nullptr)
{
peer->atom->piece_data_time = now;
}
// this should always be a tr_peerMsgs since it's
// impossible to upload piece data to a webseed...
TR_ASSERT(peer->peer_info != nullptr);
peer->peer_info->set_latest_piece_data_time(now);
break;
}
@ -564,9 +380,9 @@ public:
tor->set_dirty();
tor->session->add_downloaded(event.length);
if (peer->atom != nullptr)
if (auto* const info = peer->peer_info; info != nullptr)
{
peer->atom->piece_data_time = now;
info->set_latest_piece_data_time(now);
}
break;
@ -589,9 +405,9 @@ public:
break;
case tr_peer_event::Type::ClientGotPort:
if (peer->atom != nullptr)
if (auto* const info = peer->peer_info; info != nullptr)
{
peer->atom->port() = event.port;
info->port() = event.port;
}
break;
@ -653,7 +469,7 @@ public:
// tr_peers hold pointers to the items in this container,
// therefore references to elements within cannot invalidate
std::unordered_map<tr_socket_address, peer_atom> pool;
std::unordered_map<tr_socket_address, tr_peer_info> pool;
tr_peerMsgs* optimistic = nullptr; /* the optimistic peer, or nullptr if none */
@ -694,7 +510,7 @@ private:
for (auto& [socket_address, atom] : pool)
{
mark_atom_as_seed(atom);
mark_peer_as_seed(atom);
}
mark_all_seeds_flag_dirty();
@ -756,7 +572,7 @@ private:
if (peer->isSeed())
{
mark_atom_as_seed(*peer->atom);
mark_peer_as_seed(*peer->peer_info);
}
}
}
@ -871,7 +687,7 @@ private:
{
for (auto& [socket_address, atom] : tor->swarm->pool)
{
atom.setBlocklistedDirty();
atom.set_blocklisted_dirty();
}
}
}
@ -887,12 +703,16 @@ private:
// --- tr_peer virtual functions
tr_peer::tr_peer(tr_torrent const* tor, peer_atom* atom_in)
tr_peer::tr_peer(tr_torrent const* tor, tr_peer_info* peer_info_in)
: session{ tor->session }
, swarm{ tor->swarm }
, atom{ atom_in }
, peer_info{ peer_info_in }
, blame{ tor->block_count() }
{
if (auto* const info = peer_info; info != nullptr)
{
info->set_connected(tr_time());
}
}
tr_peer::~tr_peer()
@ -902,9 +722,9 @@ tr_peer::~tr_peer()
swarm->active_requests.remove(this);
}
if (atom != nullptr)
if (auto* const info = peer_info; info != nullptr)
{
atom->is_connected = false;
info->set_connected(tr_time(), false);
}
}
@ -922,20 +742,6 @@ void tr_peerMgrFree(tr_peerMgr* manager)
// ---
void tr_peerMgrSetUtpSupported(peer_atom* atom)
{
TR_ASSERT(atom != nullptr);
atom->flags |= ADDED_F_UTP_FLAGS;
}
void tr_peerMgrSetUtpFailed(peer_atom* atom, bool failed)
{
TR_ASSERT(atom != nullptr);
atom->utp_failed = failed;
}
/**
* REQUESTS
*
@ -1065,24 +871,23 @@ namespace
{
namespace handshake_helpers
{
void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, struct peer_atom* atom, tr_interned_string client)
void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, tr_peer_info* peer_info, tr_interned_string client)
{
TR_ASSERT(atom != nullptr);
TR_ASSERT(peer_info != nullptr);
TR_ASSERT(tr_isTorrent(tor));
TR_ASSERT(tor->swarm != nullptr);
tr_swarm* swarm = tor->swarm;
auto* peer = tr_peerMsgsNew(tor, atom, std::move(io), client, &tr_swarm::peerCallbackFunc, swarm);
atom->is_connected = true;
auto* peer = tr_peerMsgsNew(tor, peer_info, std::move(io), client, &tr_swarm::peerCallbackFunc, swarm);
swarm->peers.push_back(peer);
++swarm->stats.peer_count;
++swarm->stats.peer_from_count[atom->fromFirst];
++swarm->stats.peer_from_count[peer_info->from_first()];
TR_ASSERT(swarm->stats.peer_count == swarm->peerCount());
TR_ASSERT(swarm->stats.peer_from_count[atom->fromFirst] <= swarm->stats.peer_count);
TR_ASSERT(swarm->stats.peer_from_count[peer_info->from_first()] <= swarm->stats.peer_count);
}
/* FIXME: this is kind of a mess. */
@ -1112,9 +917,9 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, str
{
if (s != nullptr)
{
if (peer_atom* const atom = s->get_existing_atom(socket_address); atom != nullptr)
if (auto* const info = s->get_existing_peer_info(socket_address); info != nullptr)
{
++atom->num_fails;
info->on_connection_failed();
if (!result.read_anything_from_peer)
{
@ -1122,43 +927,37 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, str
s,
fmt::format(
"marking peer {} as unreachable... num_fails is {}",
atom->display_name(),
atom->num_fails));
atom->set_unreachable();
info->display_name(),
info->connection_failure_count()));
info->set_connectable(false);
}
}
}
}
else /* looking good */
{
struct peer_atom* atom = s->ensure_atom_exists(socket_address, 0, TR_PEER_FROM_INCOMING);
atom->time = tr_time();
atom->piece_data_time = 0;
atom->lastConnectionAt = tr_time();
auto& info = s->ensure_info_exists(socket_address, 0, TR_PEER_FROM_INCOMING);
if (!result.io->is_incoming())
{
atom->flags |= ADDED_F_CONNECTABLE;
atom->set_reachable();
info.set_connectable();
}
/* In principle, this flag specifies whether the peer groks µTP,
not whether it's currently connected over µTP. */
// If we're connected via µTP, then we know the peer supports µTP...
if (result.io->is_utp())
{
atom->flags |= ADDED_F_UTP_FLAGS;
info.set_utp_supported();
}
if (atom->is_banned())
if (info.is_banned())
{
tr_logAddTraceSwarm(s, fmt::format("banned peer {} tried to reconnect", atom->display_name()));
tr_logAddTraceSwarm(s, fmt::format("banned peer {} tried to reconnect", info.display_name()));
}
else if (result.io->is_incoming() && s->peerCount() >= s->tor->peer_limit())
{
/* too many peers already */
// too many peers already
}
else if (atom->is_connected)
else if (info.is_connected())
{
// we're already connected to this peer; do nothing
}
@ -1173,7 +972,7 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr<tr_peerIo> io, str
}
result.io->set_bandwidth(&s->tor->bandwidth_);
create_bit_torrent_peer(s->tor, result.io, atom, client);
create_bit_torrent_peer(s->tor, result.io, &info, client);
success = true;
}
@ -1214,7 +1013,7 @@ void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket)
}
}
size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t n_pex)
size_t tr_peerMgrAddPex(tr_torrent* tor, tr_peer_from from, tr_pex const* pex, size_t n_pex)
{
size_t n_used = 0;
tr_swarm* s = tor->swarm;
@ -1225,7 +1024,7 @@ size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t
if (tr_isPex(pex) && /* safeguard against corrupt data */
!s->manager->session->addressIsBlocked(pex->addr) && pex->is_valid_for_peers())
{
s->ensure_atom_exists({ pex->addr, pex->port }, pex->flags, from);
s->ensure_info_exists({ pex->addr, pex->port }, pex->flags, from);
++n_used;
}
}
@ -1291,50 +1090,50 @@ namespace get_peers_helpers
/* better goes first */
constexpr struct
{
[[nodiscard]] constexpr static int compare(peer_atom const& a, peer_atom const& b) noexcept // <=>
[[nodiscard]] constexpr static int compare(tr_peer_info const& a, tr_peer_info const& b) noexcept // <=>
{
if (auto const val = tr_compare_3way(a.piece_data_time, b.piece_data_time); val != 0)
if (auto const val = a.compare_by_piece_data_time(b); val != 0)
{
return -val;
}
if (auto const val = tr_compare_3way(a.fromBest, b.fromBest); val != 0)
if (auto const val = tr_compare_3way(a.from_best(), b.from_best()); val != 0)
{
return val;
}
return tr_compare_3way(a.num_fails, b.num_fails);
return a.compare_by_failure_count(b);
}
[[nodiscard]] constexpr bool operator()(peer_atom const& a, peer_atom const& b) const noexcept
[[nodiscard]] constexpr bool operator()(tr_peer_info const& a, tr_peer_info const& b) const noexcept
{
return compare(a, b) < 0;
}
[[nodiscard]] constexpr bool operator()(peer_atom const* a, peer_atom const* b) const noexcept
[[nodiscard]] constexpr bool operator()(tr_peer_info const* a, tr_peer_info const* b) const noexcept
{
return compare(*a, *b) < 0;
}
} CompareAtomsByUsefulness{};
[[nodiscard]] bool isAtomInteresting(tr_torrent const* tor, peer_atom const& atom)
[[nodiscard]] bool is_peer_interesting(tr_torrent const* tor, tr_peer_info const& info)
{
if (tor->is_done() && atom.isSeed())
if (tor->is_done() && info.is_seed())
{
return false;
}
if (tor->swarm->peer_is_in_use(atom))
if (tor->swarm->peer_is_in_use(info))
{
return true;
}
if (atom.isBlocklisted(tor->session))
if (info.is_blocklisted(tor->session))
{
return false;
}
if (atom.is_banned())
if (info.is_banned())
{
return false;
}
@ -1359,45 +1158,45 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty
// build a list of atoms
auto atoms = std::vector<peer_atom const*>{};
auto infos = std::vector<tr_peer_info const*>{};
if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
{
auto const& peers = s->peers;
atoms.reserve(std::size(peers));
infos.reserve(std::size(peers));
std::transform(
std::begin(peers),
std::end(peers),
std::back_inserter(atoms),
[](auto const* peer) { return peer->atom; });
std::back_inserter(infos),
[](auto const* peer) { return peer->peer_info; });
}
else /* TR_PEERS_INTERESTING */
{
for (auto const& [socket_address, atom] : s->pool)
for (auto const& [socket_address, peer_info] : s->pool)
{
if (isAtomInteresting(tor, atom))
if (is_peer_interesting(tor, peer_info))
{
atoms.push_back(&atom);
infos.push_back(&peer_info);
}
}
}
std::sort(std::begin(atoms), std::end(atoms), CompareAtomsByUsefulness);
std::sort(std::begin(infos), std::end(infos), CompareAtomsByUsefulness);
// add the first N of them into our return list
auto const n = std::min(std::size(atoms), max_peer_count);
auto const n = std::min(std::size(infos), max_peer_count);
auto pex = std::vector<tr_pex>{};
pex.reserve(n);
for (size_t i = 0; i < std::size(atoms) && std::size(pex) < n; ++i)
for (size_t i = 0; i < std::size(infos) && std::size(pex) < n; ++i)
{
auto const* const atom = atoms[i];
auto const& [addr, port] = atom->socket_address;
auto const* const info = infos[i];
auto const& [addr, port] = info->socket_address();
if (addr.type == address_type)
{
TR_ASSERT(addr.is_valid());
pex.emplace_back(addr, port, atom->flags);
pex.emplace_back(addr, port, info->pex_flags());
}
}
@ -1538,14 +1337,13 @@ namespace peer_stat_helpers
[[nodiscard]] auto get_peer_stats(tr_peerMsgs const* peer, time_t now, uint64_t now_msec)
{
auto stats = tr_peer_stat{};
auto const* const atom = peer->atom;
auto const [addr, port] = peer->socketAddress();
addr.display_name(stats.addr, sizeof(stats.addr));
stats.client = peer->user_agent().c_str();
stats.port = port.host();
stats.from = atom->fromFirst;
stats.from = peer->peer_info->from_first();
stats.progress = peer->percentDone();
stats.isUTP = peer->is_utp_connection();
stats.isEncrypted = peer->is_encrypted();
@ -1954,12 +1752,12 @@ auto constexpr MaxUploadIdleSecs = time_t{ 60 * 5 };
}
auto const* tor = s->tor;
auto const* const atom = peer->atom;
auto const* const info = peer->peer_info;
/* disconnect if we're both seeds and enough time has passed for PEX */
if (tor->is_done() && peer->isSeed())
{
return !tor->allows_pex() || now - atom->time >= 30;
return !tor->allows_pex() || info->idle_secs(now).value_or(0U) >= 30U;
}
/* disconnect if it's been too long since piece data has been transferred.
@ -1974,16 +1772,15 @@ auto constexpr MaxUploadIdleSecs = time_t{ 60 * 5 };
auto const lo = MinUploadIdleSecs;
auto const hi = MaxUploadIdleSecs;
time_t const limit = hi - (hi - lo) * strictness;
time_t const idle_time = now - std::max(atom->time, atom->piece_data_time);
if (idle_time > limit)
if (auto const idle_secs = info->idle_secs(now); idle_secs && *idle_secs > limit)
{
tr_logAddTraceSwarm(
s,
fmt::format(
"purging peer {} because it's been {} secs since we shared anything",
peer->display_name(),
idle_time));
*idle_secs));
return true;
}
}
@ -1994,23 +1791,7 @@ auto constexpr MaxUploadIdleSecs = time_t{ 60 * 5 };
void closePeer(tr_peer* peer)
{
TR_ASSERT(peer != nullptr);
auto const* const s = peer->swarm;
/* if we transferred piece data, then they might be good peers,
so reset their `num_fails' weight to zero. otherwise we connected
to them fruitlessly, so mark it as another fail */
if (auto* const atom = peer->atom; atom->piece_data_time != 0)
{
tr_logAddTraceSwarm(s, fmt::format("resetting atom {} num_fails to 0", peer->display_name()));
atom->num_fails = 0;
}
else
{
++atom->num_fails;
tr_logAddTraceSwarm(s, fmt::format("incremented atom {} num_fails to {}", peer->display_name(), atom->num_fails));
}
tr_logAddTraceSwarm(s, fmt::format("removing bad peer {}", peer->display_name()));
tr_logAddTraceSwarm(peer->swarm, fmt::format("removing bad peer {}", peer->display_name()));
peer->swarm->removePeer(peer);
}
@ -2023,14 +1804,7 @@ constexpr struct
return a->do_purge ? 1 : -1;
}
/* the one to give us data more recently goes first */
if (auto const val = tr_compare_3way(a->atom->piece_data_time, b->atom->piece_data_time); val != 0)
{
return -val;
}
/* the one we connected to most recently goes first */
return -tr_compare_3way(a->atom->time, b->atom->time);
return -a->peer_info->compare_by_piece_data_time(*b->peer_info);
}
[[nodiscard]] constexpr bool operator()(tr_peer const* a, tr_peer const* b) const // less than
@ -2222,10 +1996,10 @@ void tr_peerMgr::bandwidthPulse()
// ---
bool tr_swarm::peer_is_in_use(peer_atom const& atom) const
bool tr_swarm::peer_is_in_use(tr_peer_info const& peer_info) const
{
return atom.is_connected || outgoing_handshakes.count(atom.socket_address) != 0U ||
manager->incoming_handshakes.count(atom.socket_address) != 0U;
return peer_info.is_connected() || outgoing_handshakes.count(peer_info.socket_address()) != 0U ||
manager->incoming_handshakes.count(peer_info.socket_address()) != 0U;
}
namespace
@ -2233,40 +2007,40 @@ namespace
namespace connect_helpers
{
/* is this atom someone that we'd want to initiate a connection to? */
[[nodiscard]] bool isPeerCandidate(tr_torrent const* tor, peer_atom const& atom, time_t const now)
[[nodiscard]] bool isPeerCandidate(tr_torrent const* tor, tr_peer_info const& peer_info, time_t const now)
{
// have we already tried and failed to connect?
if (atom.is_unreachable())
if (auto const conn = peer_info.is_connectable(); conn && !*conn)
{
return false;
}
// not if we're both seeds
if (tor->is_done() && atom.isSeed())
if (tor->is_done() && peer_info.is_seed())
{
return false;
}
// not if we've already got a connection to them...
if (tor->swarm->peer_is_in_use(atom))
if (tor->swarm->peer_is_in_use(peer_info))
{
return false;
}
// not if we just tried them already
if (now - atom.time < atom.getReconnectIntervalSecs(now))
if (!peer_info.reconnect_interval_has_passed(now))
{
return false;
}
// not if they're blocklisted
if (atom.isBlocklisted(tor->session))
if (peer_info.is_blocklisted(tor->session))
{
return false;
}
// not if they're banned...
if (atom.is_banned())
if (peer_info.is_banned())
{
return false;
}
@ -2278,16 +2052,16 @@ struct peer_candidate
{
peer_candidate() = default;
peer_candidate(uint64_t score_in, tr_torrent* tor_in, peer_atom* atom_in)
peer_candidate(uint64_t score_in, tr_torrent* tor_in, tr_peer_info* peer_info_in)
: score{ score_in }
, tor{ tor_in }
, atom{ atom_in }
, peer_info{ peer_info_in }
{
}
uint64_t score;
tr_torrent* tor;
peer_atom* atom;
tr_peer_info* peer_info;
};
[[nodiscard]] bool torrentWasRecentlyStarted(tr_torrent const* tor)
@ -2303,18 +2077,17 @@ struct peer_candidate
}
/* smaller value is better */
[[nodiscard]] uint64_t getPeerCandidateScore(tr_torrent const* tor, peer_atom const& atom, uint8_t salt)
[[nodiscard]] uint64_t getPeerCandidateScore(tr_torrent const* tor, tr_peer_info const& peer_info, uint8_t salt)
{
auto i = uint64_t{};
auto score = uint64_t{};
bool const failed = atom.lastConnectionAt < atom.lastConnectionAttemptAt;
/* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
i = failed ? 1 : 0;
i = peer_info.connection_failure_count() != 0U ? 1U : 0U;
score = addValToKey(score, 1, i);
/* prefer the one we attempted least recently (to cycle through all peers) */
i = atom.lastConnectionAttemptAt;
i = peer_info.connection_attempt_time();
score = addValToKey(score, 32, i);
/* prefer peers belonging to a torrent of a higher priority */
@ -2344,16 +2117,16 @@ struct peer_candidate
score = addValToKey(score, 1, i);
/* prefer peers that are known to be connectible */
i = (atom.flags & ADDED_F_CONNECTABLE) != 0 ? 0 : 1;
i = peer_info.is_connectable().value_or(false) ? 0 : 1;
score = addValToKey(score, 1, i);
/* prefer peers that we might be able to upload to */
i = (atom.flags & ADDED_F_SEED_FLAG) == 0 ? 0 : 1;
i = peer_info.is_seed() ? 0 : 1;
score = addValToKey(score, 1, i);
/* Prefer peers that we got from more trusted sources.
* lower `fromBest` values indicate more trusted sources */
score = addValToKey(score, 4, atom.fromBest);
score = addValToKey(score, 4, peer_info.from_best());
/* salt */
score = addValToKey(score, 8, salt);
@ -2373,7 +2146,7 @@ struct peer_candidate
}
auto candidates = std::vector<peer_candidate>{};
candidates.reserve(peer_atom::atom_count());
candidates.reserve(tr_peer_info::known_peer_count());
/* populate the candidate array */
auto salter = tr_salt_shaker{};
@ -2430,26 +2203,17 @@ struct peer_candidate
auto ret = tr_peerMgr::OutboundCandidates{};
for (auto it = std::crbegin(candidates), end = std::crend(candidates); it != end; ++it)
{
ret.emplace_back(it->tor->id(), it->atom->socket_address);
ret.emplace_back(it->tor->id(), it->peer_info->socket_address());
}
return ret;
}
void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom)
void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, tr_peer_info& peer_info)
{
using namespace handshake_helpers;
auto const now = tr_time();
bool utp = mgr->session->allowsUTP() && !atom.utp_failed;
if (atom.fromFirst == TR_PEER_FROM_PEX)
{
/* PEX has explicit signalling for µTP support. If an atom
originally came from PEX and doesn't have the µTP flag, skip the
µTP connection attempt. Are we being optimistic here? */
utp = utp && (atom.flags & ADDED_F_UTP_FLAGS) != 0;
}
auto const utp = mgr->session->allowsUTP() && peer_info.supports_utp().value_or(true);
auto* const session = mgr->session;
if (tr_peer_socket::limit_reached(session) || (!utp && !session->allowsTCP()))
@ -2459,34 +2223,33 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom)
tr_logAddTraceSwarm(
s,
fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom.display_name()));
fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", peer_info.display_name()));
auto peer_io = tr_peerIo::new_outgoing(
session,
&session->top_bandwidth_,
atom.socket_address,
peer_info.socket_address(),
s->tor->info_hash(),
s->tor->completeness == TR_SEED,
utp);
if (!peer_io)
{
tr_logAddTraceSwarm(s, fmt::format("peerIo not created; marking peer {} as unreachable", atom.display_name()));
atom.set_unreachable();
++atom.num_fails;
tr_logAddTraceSwarm(s, fmt::format("peerIo not created; marking peer {} as unreachable", peer_info.display_name()));
peer_info.set_connectable(false);
peer_info.on_connection_failed();
}
else
{
s->outgoing_handshakes.try_emplace(
atom.socket_address,
peer_info.socket_address(),
&mgr->handshake_mediator_,
peer_io,
session->encryptionMode(),
[mgr](tr_handshake::Result const& result) { return on_handshake_done(mgr, result); });
}
atom.lastConnectionAttemptAt = now;
atom.time = now;
peer_info.set_connection_attempt_time(now);
}
} // namespace connect_helpers
} // namespace
@ -2513,9 +2276,9 @@ void tr_peerMgr::make_new_peer_connections()
if (auto* const tor = session->torrents().get(tor_id); tor != nullptr)
{
if (auto* const atom = tor->swarm->get_existing_atom(sock_addr); atom != nullptr)
if (auto* const peer_info = tor->swarm->get_existing_peer_info(sock_addr); peer_info != nullptr)
{
initiateConnection(this, tor->swarm, *atom);
initiateConnection(this, tor->swarm, *peer_info);
}
}
}
@ -2528,9 +2291,9 @@ void HandshakeMediator::set_utp_failed(tr_sha1_digest_t const& info_hash, tr_soc
{
if (auto* const tor = session_.torrents().get(info_hash); tor != nullptr)
{
if (auto* const atom = tor->swarm->get_existing_atom(socket_address); atom != nullptr)
if (auto* const peer_info = tor->swarm->get_existing_peer_info(socket_address); peer_info != nullptr)
{
tr_peerMgrSetUtpFailed(atom, true);
peer_info->set_utp_supported(false);
}
}
}

View File

@ -9,14 +9,20 @@
#error only libtransmission should #include this header.
#endif
#include <atomic>
#include <cstddef> // size_t
#include <cstdint> // uint8_t, uint64_t
#include <ctime>
#include <limits>
#include <optional>
#include <string>
#include <vector>
#include "libtransmission/transmission.h" // tr_block_span_t (ptr only)
#include "libtransmission/net.h" /* tr_address */
#include "libtransmission/tr-assert.h"
#include "libtransmission/utils.h" /* tr_compare_3way */
/**
* @addtogroup peers Peers
@ -24,8 +30,8 @@
*/
class tr_peer;
struct tr_peerMgr;
class tr_peer_socket;
struct tr_peerMgr;
struct tr_peer_stat;
struct tr_session;
struct tr_torrent;
@ -46,6 +52,346 @@ enum
ADDED_F_CONNECTABLE = 16
};
/**
* Peer information that should be retained even when not connected,
* e.g. to help us decide which peers to connect to.
*/
class tr_peer_info
{
public:
tr_peer_info(tr_socket_address socket_address, uint8_t pex_flags, tr_peer_from from)
: socket_address_{ std::move(socket_address) }
, from_first_{ from }
, from_best_{ from }
{
++n_known_peers;
set_pex_flags(pex_flags);
}
tr_peer_info(tr_peer_info&&) = delete;
tr_peer_info(tr_peer_info const&) = delete;
tr_peer_info& operator=(tr_peer_info&&) = delete;
tr_peer_info& operator=(tr_peer_info const&) = delete;
~tr_peer_info()
{
[[maybe_unused]] auto const n_prev = n_known_peers--;
TR_ASSERT(n_prev > 0U);
}
[[nodiscard]] static auto known_peer_count() noexcept
{
return n_known_peers.load();
}
// ---
[[nodiscard]] constexpr auto const& socket_address() const noexcept
{
return socket_address_;
}
[[nodiscard]] constexpr auto& port() noexcept
{
return socket_address_.port_;
}
[[nodiscard]] auto display_name() const
{
return socket_address_.display_name();
}
// ---
[[nodiscard]] constexpr auto from_first() const noexcept
{
return from_first_;
}
[[nodiscard]] constexpr auto from_best() const noexcept
{
return from_best_;
}
constexpr void found_at(tr_peer_from from) noexcept
{
from_best_ = std::min(from_best_, from);
}
// ---
constexpr void set_seed(bool seed = true) noexcept
{
is_seed_ = seed;
}
[[nodiscard]] constexpr auto is_seed() const noexcept
{
return is_seed_;
}
// ---
void set_connectable(bool value = true) noexcept
{
is_connectable_ = value;
}
[[nodiscard]] constexpr auto const& is_connectable() const noexcept
{
return is_connectable_;
}
// ---
void set_utp_supported(bool value = true) noexcept
{
is_utp_supported_ = value;
}
[[nodiscard]] constexpr auto supports_utp() const noexcept
{
return is_utp_supported_;
}
// ---
[[nodiscard]] constexpr auto compare_by_failure_count(tr_peer_info const& that) const noexcept
{
return tr_compare_3way(num_consecutive_fails_, that.num_consecutive_fails_);
}
[[nodiscard]] constexpr auto compare_by_piece_data_time(tr_peer_info const& that) const noexcept
{
return tr_compare_3way(piece_data_at_, that.piece_data_at_);
}
// ---
constexpr auto set_connected(time_t now, bool is_connected = true) noexcept
{
connection_changed_at_ = now;
is_connected_ = is_connected;
if (is_connected_)
{
num_consecutive_fails_ = {};
piece_data_at_ = {};
}
}
[[nodiscard]] constexpr auto is_connected() const noexcept
{
return is_connected_;
}
// ---
[[nodiscard]] bool is_blocklisted(tr_session const* session) const;
void set_blocklisted_dirty()
{
blocklisted_.reset();
}
// ---
constexpr void ban() noexcept
{
is_banned_ = true;
}
[[nodiscard]] constexpr auto is_banned() const noexcept
{
return is_banned_;
}
// ---
[[nodiscard]] constexpr auto connection_attempt_time() const noexcept
{
return connection_attempted_at_;
}
constexpr void set_connection_attempt_time(time_t value) noexcept
{
connection_attempted_at_ = value;
}
constexpr void set_latest_piece_data_time(time_t value) noexcept
{
piece_data_at_ = value;
}
[[nodiscard]] constexpr bool has_transferred_piece_data() const noexcept
{
return piece_data_at_ != time_t{};
}
[[nodiscard]] constexpr auto reconnect_interval_has_passed(time_t const now) const noexcept
{
auto const interval = now - std::max(connection_attempted_at_, connection_changed_at_);
return interval >= get_reconnect_interval_secs(now);
}
[[nodiscard]] constexpr std::optional<time_t> idle_secs(time_t now) const noexcept
{
if (!is_connected_)
{
return {};
}
return now - std::max(piece_data_at_, connection_changed_at_);
}
// ---
constexpr void on_connection_failed() noexcept
{
if (num_consecutive_fails_ != std::numeric_limits<decltype(num_consecutive_fails_)>::max())
{
++num_consecutive_fails_;
}
}
[[nodiscard]] constexpr auto connection_failure_count() const noexcept
{
return num_consecutive_fails_;
}
// ---
constexpr void set_pex_flags(uint8_t pex_flags) noexcept
{
pex_flags_ = pex_flags;
if ((pex_flags & ADDED_F_CONNECTABLE) != 0U)
{
set_connectable();
}
if ((pex_flags & ADDED_F_UTP_FLAGS) != 0U)
{
set_utp_supported();
}
is_seed_ = (pex_flags & ADDED_F_SEED_FLAG) != 0U;
}
[[nodiscard]] constexpr uint8_t pex_flags() const noexcept
{
auto ret = pex_flags_;
if (is_connectable_)
{
if (*is_connectable_)
{
ret |= ADDED_F_CONNECTABLE;
}
else
{
ret &= ~ADDED_F_CONNECTABLE;
}
}
if (is_utp_supported_)
{
if (*is_utp_supported_)
{
ret |= ADDED_F_UTP_FLAGS;
}
else
{
ret &= ~ADDED_F_UTP_FLAGS;
}
}
if (is_seed_)
{
ret |= ADDED_F_SEED_FLAG;
}
return ret;
}
private:
[[nodiscard]] constexpr tr_address const& addr() const noexcept
{
return socket_address_.address();
}
[[nodiscard]] constexpr auto port() const noexcept
{
return socket_address_.port();
}
[[nodiscard]] constexpr time_t get_reconnect_interval_secs(time_t const now) const noexcept
{
// if we were recently connected to this peer and transferring piece
// data, try to reconnect to them sooner rather that later -- we don't
// want network troubles to get in the way of a good peer.
auto const unreachable = is_connectable_ && !*is_connectable_;
if (!unreachable && now - piece_data_at_ <= MinimumReconnectIntervalSecs * 2)
{
return MinimumReconnectIntervalSecs;
}
// otherwise, the interval depends on how many times we've tried
// and failed to connect to the peer. Penalize peers that were
// unreachable the last time we tried
auto step = this->num_consecutive_fails_;
if (unreachable)
{
step += 2;
}
switch (step)
{
case 0:
return 0U;
case 1:
return 10U;
case 2:
return 60U * 2U;
case 3:
return 60U * 15U;
case 4:
return 60U * 30U;
case 5:
return 60U * 60U;
default:
return 60U * 120U;
}
}
// the minimum we'll wait before attempting to reconnect to a peer
static auto constexpr MinimumReconnectIntervalSecs = time_t{ 5U };
static auto inline n_known_peers = std::atomic<size_t>{};
tr_socket_address socket_address_;
time_t connection_attempted_at_ = {};
time_t connection_changed_at_ = {};
time_t piece_data_at_ = {};
mutable std::optional<bool> blocklisted_;
std::optional<bool> is_connectable_;
std::optional<bool> is_utp_supported_;
tr_peer_from from_first_; // where the peer was first found
tr_peer_from from_best_; // the "best" place where this peer was found
uint8_t num_consecutive_fails_ = {};
uint8_t pex_flags_ = {};
bool is_banned_ = false;
bool is_connected_ = false;
bool is_seed_ = false;
};
struct tr_pex
{
tr_pex() = default;
@ -156,10 +502,6 @@ constexpr bool tr_isPex(tr_pex const* pex)
void tr_peerMgrFree(tr_peerMgr* manager);
void tr_peerMgrSetUtpSupported(struct peer_atom* atom);
void tr_peerMgrSetUtpFailed(struct peer_atom*, bool failed);
[[nodiscard]] std::vector<tr_block_span_t> tr_peerMgrGetNextRequests(tr_torrent* torrent, tr_peer const* peer, size_t numwant);
[[nodiscard]] bool tr_peerMgrDidPeerRequest(tr_torrent const* torrent, tr_peer const* peer, tr_block_index_t block);
@ -170,7 +512,7 @@ void tr_peerMgrClientSentRequests(tr_torrent* torrent, tr_peer* peer, tr_block_s
void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket);
size_t tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, size_t n_pex);
size_t tr_peerMgrAddPex(tr_torrent* tor, tr_peer_from from, tr_pex const* pex, size_t n_pex);
enum
{

View File

@ -46,7 +46,7 @@
#include "libtransmission/variant.h"
#include "libtransmission/version.h"
struct peer_atom;
class tr_peer_info;
struct tr_error;
#ifndef EBADMSG
@ -317,20 +317,20 @@ using ReadResult = std::pair<ReadState, size_t /*n_piece_data_bytes_read*/>;
* stored in tr_peer, where it can be accessed by both peermsgs and
* the peer manager.
*
* @see struct peer_atom
* @see tr_peer
* @see tr_peer_info
*/
class tr_peerMsgsImpl final : public tr_peerMsgs
{
public:
tr_peerMsgsImpl(
tr_torrent* torrent_in,
peer_atom* atom_in,
tr_peer_info* const peer_info_in,
std::shared_ptr<tr_peerIo> io_in,
tr_interned_string client,
tr_peer_callback callback,
void* callback_data)
: tr_peerMsgs{ torrent_in, atom_in, client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() }
: tr_peerMsgs{ torrent_in, peer_info_in, client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() }
, torrent{ torrent_in }
, io{ std::move(io_in) }
, have_{ torrent_in->piece_count() }
@ -343,12 +343,6 @@ public:
pex_timer_->start_repeating(SendPexInterval);
}
if (io->supports_utp())
{
tr_peerMgrSetUtpSupported(atom);
tr_peerMgrSetUtpFailed(atom, false);
}
if (io->supports_ltep())
{
sendLtepHandshake(this);
@ -1099,9 +1093,10 @@ void parseLtepHandshake(tr_peerMsgsImpl* msgs, MessageReader& payload)
if (tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &i))
{
/* Mysterious µTorrent extension that we don't grok. However,
it implies support for µTP, so use it to indicate that. */
tr_peerMgrSetUtpFailed(msgs->atom, false);
// Transmission doesn't support this extension yet.
// But its presence does indicate µTP supports,
// which we do care about...
msgs->peer_info->set_utp_supported(true);
}
}
@ -2175,12 +2170,12 @@ tr_peerMsgs::~tr_peerMsgs()
}
tr_peerMsgs* tr_peerMsgsNew(
tr_torrent* torrent,
peer_atom* atom,
tr_torrent* const torrent,
tr_peer_info* const peer_info,
std::shared_ptr<tr_peerIo> io,
tr_interned_string user_agent,
tr_peer_callback callback,
void* callback_data)
{
return new tr_peerMsgsImpl(torrent, atom, std::move(io), user_agent, callback, callback_data);
return new tr_peerMsgsImpl(torrent, peer_info, std::move(io), user_agent, callback, callback_data);
}

View File

@ -21,8 +21,8 @@
#include "libtransmission/net.h" // tr_socket_address
#include "libtransmission/peer-common.h" // for tr_peer
struct peer_atom;
class tr_peerIo;
class tr_peer_info;
struct tr_torrent;
/**
@ -35,12 +35,12 @@ class tr_peerMsgs : public tr_peer
public:
tr_peerMsgs(
tr_torrent const* tor,
peer_atom* atom_in,
tr_peer_info* peer_info_in,
tr_interned_string user_agent,
bool connection_is_encrypted,
bool connection_is_incoming,
bool connection_is_utp)
: tr_peer{ tor, atom_in }
: tr_peer{ tor, peer_info_in }
, user_agent_{ user_agent }
, connection_is_encrypted_{ connection_is_encrypted }
, connection_is_incoming_{ connection_is_incoming }
@ -173,7 +173,7 @@ private:
tr_peerMsgs* tr_peerMsgsNew(
tr_torrent* torrent,
peer_atom* atom,
tr_peer_info* peer_info,
std::shared_ptr<tr_peerIo> io,
tr_interned_string user_agent,
tr_peer_callback callback,

View File

@ -1428,7 +1428,7 @@ enum tr_torrent_activity
TR_STATUS_SEED_WAIT = 5, /* Queued to seed */
TR_STATUS_SEED = 6 /* Seeding */
};
enum
enum tr_peer_from : uint8_t
{
TR_PEER_FROM_INCOMING = 0, /* connections made to the listening port */
TR_PEER_FROM_LPD, /* peers found by local announcements */