1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2024-12-23 16:24:02 +00:00

perf: faster get peer candidates (#5734)

This commit is contained in:
Charles Kerr 2023-07-05 23:16:18 -05:00 committed by GitHub
parent 2211086338
commit 685e245f4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -12,14 +12,16 @@
#include <cstddef> // std::byte
#include <cstdint>
#include <ctime> // time_t
#include <map>
#include <iterator> // std::back_inserter
#include <map>
#include <memory>
#include <optional>
#include <tuple> // std::tie
#include <utility>
#include <vector>
#include <small/vector.hpp>
#include <fmt/core.h>
#define LIBTRANSMISSION_PEER_MODULE
@ -797,6 +799,32 @@ private:
struct tr_peerMgr
{
private:
static auto constexpr BandwidthTimerPeriod = 500ms;
static auto constexpr RechokePeriod = 10s;
static auto constexpr RefillUpkeepPeriod = 10s;
// Max number of outbound peer connections to initiate.
// This throttle is an arbitrary number to avoid overloading routers.
static auto constexpr MaxConnectionsPerSecond = size_t{ 18U };
static auto constexpr MaxConnectionsPerPulse = size_t(MaxConnectionsPerSecond * BandwidthTimerPeriod / 1s);
// Building a peer candidate list is expensive, so cache it across pulses.
// We want to cache it long enough to avoid excess CPU cycles,
// but short enough that the data isn't too stale.
static auto constexpr OutboundCandidatesListTtl = BandwidthTimerPeriod * 4U;
// How big the candidate list should be when we create it.
static auto constexpr OutboundCandidateListCapacity = MaxConnectionsPerPulse * OutboundCandidatesListTtl /
BandwidthTimerPeriod;
public:
// The peers we might try connecting to in the next few seconds.
// This list is cached between pulses so use resilient keys, e.g.
// a `tr_torrent_id_t` instead of a `tr_torrent*` that can be freed.
using OutboundCandidates = small::
max_size_vector<std::pair<tr_torrent_id_t, tr_socket_address>, OutboundCandidateListCapacity>;
explicit tr_peerMgr(tr_session* session_in)
: session{ session_in }
, handshake_mediator_{ *session }
@ -805,7 +833,7 @@ struct tr_peerMgr
, refill_upkeep_timer_{ session->timerMaker().create([this]() { refillUpkeep(); }) }
, blocklist_tag_{ session->blocklist_changed_.observe([this]() { on_blocklist_changed(); }) }
{
bandwidth_timer_->start_repeating(BandwidthPeriod);
bandwidth_timer_->start_repeating(BandwidthTimerPeriod);
rechoke_timer_->start_repeating(RechokePeriod);
refill_upkeep_timer_->start_repeating(RefillUpkeepPeriod);
}
@ -835,7 +863,7 @@ struct tr_peerMgr
void rechokePulse() const;
void reconnectPulse();
void refillUpkeep() const;
void makeNewPeerConnections(size_t max);
void make_new_peer_connections();
[[nodiscard]] tr_swarm* get_existing_swarm(tr_sha1_digest_t const& hash) const
{
@ -868,22 +896,13 @@ private:
}
}
OutboundCandidates outbound_candidates_;
std::unique_ptr<libtransmission::Timer> const bandwidth_timer_;
std::unique_ptr<libtransmission::Timer> const rechoke_timer_;
std::unique_ptr<libtransmission::Timer> const refill_upkeep_timer_;
libtransmission::ObserverTag const blocklist_tag_;
static auto constexpr BandwidthPeriod = 500ms;
static auto constexpr RechokePeriod = 10s;
static auto constexpr RefillUpkeepPeriod = 10s;
// how frequently to decide which peers live and die
static auto constexpr ReconnectPeriodMsec = int{ 500 };
// max number of peers to ask for per second overall.
// this throttle is to avoid overloading the router
static auto constexpr MaxConnectionsPerSecond = size_t{ 12 };
};
// --- tr_peer virtual functions
@ -2172,8 +2191,7 @@ void tr_peerMgr::reconnectPulse()
enforceSessionPeerLimit(session);
// try to make new peer connections
auto const max_connections_per_pulse = int(MaxConnectionsPerSecond * (ReconnectPeriodMsec / 1000.0));
makeNewPeerConnections(max_connections_per_pulse);
make_new_peer_connections();
}
// --- Bandwidth Allocation
@ -2224,7 +2242,7 @@ void tr_peerMgr::bandwidthPulse()
pumpAllPeers(this);
// allocate bandwidth to the peers
static auto constexpr Msec = std::chrono::duration_cast<std::chrono::milliseconds>(BandwidthPeriod).count();
static auto constexpr Msec = std::chrono::duration_cast<std::chrono::milliseconds>(BandwidthTimerPeriod).count();
session->top_bandwidth_.allocate(Msec);
// torrent upkeep
@ -2297,6 +2315,15 @@ namespace connect_helpers
struct peer_candidate
{
peer_candidate() = default;
peer_candidate(uint64_t score_in, tr_torrent* tor_in, peer_atom* atom_in)
: score{ score_in }
, tor{ tor_in }
, atom{ atom_in }
{
}
uint64_t score;
tr_torrent* tor;
peer_atom* atom;
@ -2373,8 +2400,7 @@ struct peer_candidate
return score;
}
/** @return an array of all the atoms we might want to connect to */
[[nodiscard]] std::vector<peer_candidate> getPeerCandidates(tr_session* session, size_t max)
[[nodiscard]] tr_peerMgr::OutboundCandidates get_peer_candidates(tr_session* session)
{
auto const now = tr_time();
auto const now_msec = tr_time_msec();
@ -2423,13 +2449,13 @@ struct peer_candidate
{
if (isPeerCandidate(tor, atom, now))
{
candidates.push_back({ getPeerCandidateScore(tor, atom, salter()), tor, &atom });
candidates.emplace_back(getPeerCandidateScore(tor, atom, salter()), tor, &atom);
}
}
}
// only keep the best `max` candidates
if (std::size(candidates) > max)
if (auto const max = tr_peerMgr::OutboundCandidates::requested_inline_size; max < std::size(candidates))
{
std::partial_sort(
std::begin(candidates),
@ -2439,7 +2465,12 @@ struct peer_candidate
candidates.resize(max);
}
return candidates;
auto ret = tr_peerMgr::OutboundCandidates{};
for (auto const& candidate : candidates)
{
ret.emplace_back(candidate.tor->id(), candidate.atom->socket_address);
}
return ret;
}
void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom)
@ -2498,16 +2529,34 @@ void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom)
} // namespace connect_helpers
} // namespace
void tr_peerMgr::makeNewPeerConnections(size_t max)
void tr_peerMgr::make_new_peer_connections()
{
using namespace connect_helpers;
auto const lock = session->unique_lock();
for (auto& candidate : getPeerCandidates(session, max))
// get the candidates if we need to
auto& peers = outbound_candidates_;
if (std::empty(peers))
{
initiateConnection(this, candidate.tor->swarm, *candidate.atom);
peers = get_peer_candidates(session);
}
// initiate connections to the first N candidates
auto const n_this_pass = std::min(std::size(peers), MaxConnectionsPerPulse);
for (size_t i = 0; i < n_this_pass; ++i)
{
auto const& [tor_id, sock_addr] = peers[i];
auto* const tor = session->torrents().get(tor_id);
auto* const atom = tor->swarm->get_existing_atom(sock_addr);
if (tor != nullptr && atom != nullptr)
{
initiateConnection(this, tor->swarm, *atom);
}
}
// remove the first N candidates from the list
peers.erase(std::begin(peers), std::begin(peers) + n_this_pass);
}
// ---