diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 13317c188..2d373055f 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -12,14 +12,16 @@ #include // std::byte #include #include // time_t -#include #include // std::back_inserter +#include #include #include #include // std::tie #include #include +#include + #include #define LIBTRANSMISSION_PEER_MODULE @@ -797,6 +799,32 @@ private: struct tr_peerMgr { +private: + static auto constexpr BandwidthTimerPeriod = 500ms; + static auto constexpr RechokePeriod = 10s; + static auto constexpr RefillUpkeepPeriod = 10s; + + // Max number of outbound peer connections to initiate. + // This throttle is an arbitrary number to avoid overloading routers. + static auto constexpr MaxConnectionsPerSecond = size_t{ 18U }; + static auto constexpr MaxConnectionsPerPulse = size_t(MaxConnectionsPerSecond * BandwidthTimerPeriod / 1s); + + // Building a peer candidate list is expensive, so cache it across pulses. + // We want to cache it long enough to avoid excess CPU cycles, + // but short enough that the data isn't too stale. + static auto constexpr OutboundCandidatesListTtl = BandwidthTimerPeriod * 4U; + + // How big the candidate list should be when we create it. + static auto constexpr OutboundCandidateListCapacity = MaxConnectionsPerPulse * OutboundCandidatesListTtl / + BandwidthTimerPeriod; + +public: + // The peers we might try connecting to in the next few seconds. + // This list is cached between pulses so use resilient keys, e.g. + // a `tr_torrent_id_t` instead of a `tr_torrent*` that can be freed. + using OutboundCandidates = small:: + max_size_vector, OutboundCandidateListCapacity>; + explicit tr_peerMgr(tr_session* session_in) : session{ session_in } , handshake_mediator_{ *session } @@ -805,7 +833,7 @@ struct tr_peerMgr , refill_upkeep_timer_{ session->timerMaker().create([this]() { refillUpkeep(); }) } , blocklist_tag_{ session->blocklist_changed_.observe([this]() { on_blocklist_changed(); }) } { - bandwidth_timer_->start_repeating(BandwidthPeriod); + bandwidth_timer_->start_repeating(BandwidthTimerPeriod); rechoke_timer_->start_repeating(RechokePeriod); refill_upkeep_timer_->start_repeating(RefillUpkeepPeriod); } @@ -835,7 +863,7 @@ struct tr_peerMgr void rechokePulse() const; void reconnectPulse(); void refillUpkeep() const; - void makeNewPeerConnections(size_t max); + void make_new_peer_connections(); [[nodiscard]] tr_swarm* get_existing_swarm(tr_sha1_digest_t const& hash) const { @@ -868,22 +896,13 @@ private: } } + OutboundCandidates outbound_candidates_; + std::unique_ptr const bandwidth_timer_; std::unique_ptr const rechoke_timer_; std::unique_ptr const refill_upkeep_timer_; libtransmission::ObserverTag const blocklist_tag_; - - static auto constexpr BandwidthPeriod = 500ms; - static auto constexpr RechokePeriod = 10s; - static auto constexpr RefillUpkeepPeriod = 10s; - - // how frequently to decide which peers live and die - static auto constexpr ReconnectPeriodMsec = int{ 500 }; - - // max number of peers to ask for per second overall. - // this throttle is to avoid overloading the router - static auto constexpr MaxConnectionsPerSecond = size_t{ 12 }; }; // --- tr_peer virtual functions @@ -2172,8 +2191,7 @@ void tr_peerMgr::reconnectPulse() enforceSessionPeerLimit(session); // try to make new peer connections - auto const max_connections_per_pulse = int(MaxConnectionsPerSecond * (ReconnectPeriodMsec / 1000.0)); - makeNewPeerConnections(max_connections_per_pulse); + make_new_peer_connections(); } // --- Bandwidth Allocation @@ -2224,7 +2242,7 @@ void tr_peerMgr::bandwidthPulse() pumpAllPeers(this); // allocate bandwidth to the peers - static auto constexpr Msec = std::chrono::duration_cast(BandwidthPeriod).count(); + static auto constexpr Msec = std::chrono::duration_cast(BandwidthTimerPeriod).count(); session->top_bandwidth_.allocate(Msec); // torrent upkeep @@ -2297,6 +2315,15 @@ namespace connect_helpers struct peer_candidate { + peer_candidate() = default; + + peer_candidate(uint64_t score_in, tr_torrent* tor_in, peer_atom* atom_in) + : score{ score_in } + , tor{ tor_in } + , atom{ atom_in } + { + } + uint64_t score; tr_torrent* tor; peer_atom* atom; @@ -2373,8 +2400,7 @@ struct peer_candidate return score; } -/** @return an array of all the atoms we might want to connect to */ -[[nodiscard]] std::vector getPeerCandidates(tr_session* session, size_t max) +[[nodiscard]] tr_peerMgr::OutboundCandidates get_peer_candidates(tr_session* session) { auto const now = tr_time(); auto const now_msec = tr_time_msec(); @@ -2423,13 +2449,13 @@ struct peer_candidate { if (isPeerCandidate(tor, atom, now)) { - candidates.push_back({ getPeerCandidateScore(tor, atom, salter()), tor, &atom }); + candidates.emplace_back(getPeerCandidateScore(tor, atom, salter()), tor, &atom); } } } // only keep the best `max` candidates - if (std::size(candidates) > max) + if (auto const max = tr_peerMgr::OutboundCandidates::requested_inline_size; max < std::size(candidates)) { std::partial_sort( std::begin(candidates), @@ -2439,7 +2465,12 @@ struct peer_candidate candidates.resize(max); } - return candidates; + auto ret = tr_peerMgr::OutboundCandidates{}; + for (auto const& candidate : candidates) + { + ret.emplace_back(candidate.tor->id(), candidate.atom->socket_address); + } + return ret; } void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom) @@ -2498,16 +2529,34 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom) } // namespace connect_helpers } // namespace -void tr_peerMgr::makeNewPeerConnections(size_t max) +void tr_peerMgr::make_new_peer_connections() { using namespace connect_helpers; auto const lock = session->unique_lock(); - for (auto& candidate : getPeerCandidates(session, max)) + // get the candidates if we need to + auto& peers = outbound_candidates_; + if (std::empty(peers)) { - initiateConnection(this, candidate.tor->swarm, *candidate.atom); + peers = get_peer_candidates(session); } + + // initiate connections to the first N candidates + auto const n_this_pass = std::min(std::size(peers), MaxConnectionsPerPulse); + for (size_t i = 0; i < n_this_pass; ++i) + { + auto const& [tor_id, sock_addr] = peers[i]; + auto* const tor = session->torrents().get(tor_id); + auto* const atom = tor->swarm->get_existing_atom(sock_addr); + if (tor != nullptr && atom != nullptr) + { + initiateConnection(this, tor->swarm, *atom); + } + } + + // remove the first N candidates from the list + peers.erase(std::begin(peers), std::begin(peers) + n_this_pass); } // ---