diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 98b9d34a3..330857332 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -10,9 +10,11 @@ #include #include /* qsort */ #include // time_t +#include #include // std::back_inserter #include #include // std::accumulate +#include #include // std::tie #include #include @@ -47,9 +49,6 @@ #include "utils.h" #include "webseed.h" -// how frequently to cull old atoms -static auto constexpr AtomPeriodMsec = int{ 60 * 1000 }; - // how frequently to change which peers are choked static auto constexpr RechokePeriodMsec = int{ 10 * 1000 }; @@ -213,6 +212,21 @@ struct peer_atom blocklisted_.reset(); } + std::optional isReachable() const + { + if ((flags2 & MyflagUnreachable) != 0) + { + return false; + } + + if ((flags & ADDED_F_CONNECTABLE) != 0) + { + return true; + } + + return std::nullopt; + } + tr_address const addr; tr_port port = {}; @@ -324,7 +338,11 @@ public: std::vector> webseeds; std::vector peers; - std::vector> pool; + + // tr_peers hold pointers to the items in this container, + // so use a deque instead of vector to prevent insertion from + // invalidating those pointers + std::deque pool; tr_torrent* const tor; @@ -352,7 +370,6 @@ struct tr_peerMgr event* bandwidthTimer = nullptr; event* rechokeTimer = nullptr; event* refillUpkeepTimer = nullptr; - event* atomTimer = nullptr; }; #define tr_logAddDebugSwarm(swarm, msg) tr_logAddDebugTor((swarm)->tor, msg) @@ -407,10 +424,10 @@ static struct peer_atom* getExistingAtom(tr_swarm const* cswarm, tr_address cons auto* swarm = const_cast(cswarm); auto const test = [&addr](auto const& atom) { - return atom->addr == addr; + return atom.addr == addr; }; auto const it = std::find_if(std::begin(swarm->pool), std::end(swarm->pool), test); - return it != std::end(swarm->pool) ? it->get() : nullptr; + return it != std::end(swarm->pool) ? &*it : nullptr; } static bool peerIsInUse(tr_swarm const* cs, struct peer_atom const* atom) @@ -482,7 +499,6 @@ static void deleteTimer(struct event** t) static void deleteTimers(struct tr_peerMgr* m) { - deleteTimer(&m->atomTimer); deleteTimer(&m->bandwidthTimer); deleteTimer(&m->rechokeTimer); deleteTimer(&m->refillUpkeepTimer); @@ -511,7 +527,7 @@ void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr) { for (auto& atom : tor->swarm->pool) { - atom->setBlocklistedDirty(); + atom.setBlocklistedDirty(); } } } @@ -939,10 +955,7 @@ static struct peer_atom* ensureAtomExists( if (a == nullptr) { - auto atom = std::make_unique(addr, port, flags, from); - a = atom.get(); - auto const it = std::lower_bound(std::begin(s->pool), std::end(s->pool), atom); - s->pool.insert(it, std::move(atom)); + a = &s->pool.emplace_back(addr, port, flags, from); } else { @@ -1121,7 +1134,7 @@ void tr_peerMgrSetSwarmIsAllSeeds(tr_torrent* tor) for (auto& atom : swarm->pool) { - atomSetSeed(swarm, *atom); + atomSetSeed(swarm, atom); } swarm->pool_is_all_seeds = true; @@ -1303,9 +1316,9 @@ std::vector tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t af, uint8_ { for (auto const& atom : s->pool) { - if (isAtomInteresting(tor, *atom)) + if (isAtomInteresting(tor, atom)) { - atoms.push_back(atom.get()); + atoms.push_back(&atom); } } } @@ -1335,7 +1348,6 @@ std::vector tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t af, uint8_ return pex; } -static void atomPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/); static void bandwidthPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/); static void rechokePulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/); static void reconnectPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/); @@ -1349,11 +1361,6 @@ static struct event* createTimer(tr_session* session, int msec, event_callback_f static void ensureMgrTimersExist(struct tr_peerMgr* m) { - if (m->atomTimer == nullptr) - { - m->atomTimer = createTimer(m->session, AtomPeriodMsec, atomPulse, m); - } - if (m->bandwidthTimer == nullptr) { m->bandwidthTimer = createTimer(m->session, BandwidthPeriodMsec, bandwidthPulse, m); @@ -2502,110 +2509,6 @@ static void bandwidthPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr) tr_timerAddMsec(*mgr->bandwidthTimer, BandwidthPeriodMsec); } -/*** -**** -***/ - -#if 0 -static auto getMaxAtomCount(tr_torrent const* tor) -{ - static auto constexpr Limit = uint16_t{ 50 }; - static auto constexpr Multiplier = uint16_t{ 3 }; - - return std::min(Limit, static_cast(tor->max_connected_peers * Multiplier)); -} - -struct CompareAtomsByActivity -{ - CompareAtomsByActivity(tr_swarm const& swarm) - : swarm_{ swarm } - { - } - - [[nodiscard]] int compare(peer_atom const& a, peer_atom const& b) const - { - // primary key: peers in use go first - auto const a_use = peerIsInUse(&swarm_, &a); - auto const b_use = peerIsInUse(&swarm_, &b); - if (a_use != b_use) - { - return a_use ? -1 : 1; - } - - auto constexpr data_time_cutoff_secs = int{ 60 * 60 }; - auto const tr_now = tr_time(); - - // secondary key: the last piece data time *if* it was within the last hour - time_t atime = a.piece_data_time; - - if (atime + data_time_cutoff_secs < tr_now) - { - atime = 0; - } - - time_t btime = b.piece_data_time; - - if (btime + data_time_cutoff_secs < tr_now) - { - btime = 0; - } - - if (atime != btime) - { - return atime > btime ? -1 : 1; - } - - return 0; - } - - [[nodiscard]] int compare(std::unique_ptr const& a, std::unique_ptr const& b) const noexcept - { - return compare(*a, *b); - } - - [[nodiscard]] bool operator()(std::unique_ptr const& a, std::unique_ptr const& b) const noexcept - { - return compare(*a, *b) < 0; - } - -private: - tr_swarm const& swarm_; -}; -#endif - -static void atomPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr) -{ - auto* mgr = static_cast(vmgr); - auto const lock = mgr->unique_lock(); - -#if 0 - for (auto* const tor : mgr->session->torrents()) - { - // do we have too many atoms? - auto& atoms = tor->swarm->pool; - auto const max_atom_count = getMaxAtomCount(tor); - auto const atom_count = std::size(atoms); - if (atom_count <= max_atom_count) - { - continue; - } - - // we've got too many... prune the oldest / least active - std::partial_sort( - std::begin(atoms), - std::begin(atoms) + max_atom_count, - std::end(atoms), - CompareAtomsByActivity{ *tor->swarm }); - atoms.resize(max_atom_count); - tr_logAddTraceSwarm( - tor->swarm, - fmt::format("max atom count is {}... pruned from {} to {}", max_atom_count, atom_count, std::size(atoms))); - } -#endif - - tr_timerAddMsec(*mgr->atomTimer, AtomPeriodMsec); -} - /*** **** **** @@ -2613,34 +2516,40 @@ static void atomPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr) ***/ /* is this atom someone that we'd want to initiate a connection to? */ -static bool isPeerCandidate(tr_torrent const* tor, struct peer_atom* atom, time_t const now) +static bool isPeerCandidate(tr_torrent const* tor, peer_atom const& atom, time_t const now) { - /* not if we're both seeds */ - if (tor->isDone() && atom->isSeed()) + // have we already tried and failed to connect? + if (auto const reachable = atom.isReachable(); reachable && !*reachable) { return false; } - /* not if we've already got a connection to them... */ - if (peerIsInUse(tor->swarm, atom)) + // not if we're both seeds + if (tor->isDone() && atom.isSeed()) { return false; } - /* not if we just tried them already */ - if (now - atom->time < atom->getReconnectIntervalSecs(now)) + // not if we've already got a connection to them... + if (peerIsInUse(tor->swarm, &atom)) { return false; } - /* not if they're blocklisted */ - if (atom->isBlocklisted(tor->session)) + // not if we just tried them already + if (now - atom.time < atom.getReconnectIntervalSecs(now)) { return false; } - /* not if they're banned... */ - if ((atom->flags2 & MyflagBanned) != 0) + // not if they're blocklisted + if (atom.isBlocklisted(tor->session)) + { + return false; + } + + // not if they're banned... + if ((atom.flags2 & MyflagBanned) != 0) { return false; } @@ -2652,7 +2561,7 @@ struct peer_candidate { uint64_t score; tr_torrent* tor; - struct peer_atom* atom; + peer_atom* atom; }; static bool torrentWasRecentlyStarted(tr_torrent const* tor) @@ -2668,18 +2577,18 @@ static constexpr uint64_t addValToKey(uint64_t value, int width, uint64_t addme) } /* smaller value is better */ -static uint64_t getPeerCandidateScore(tr_torrent const* tor, struct peer_atom const* atom, uint8_t salt) +static uint64_t getPeerCandidateScore(tr_torrent const* tor, peer_atom const& atom, uint8_t salt) { auto i = uint64_t{}; auto score = uint64_t{}; - bool const failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt; + bool const failed = atom.lastConnectionAt < atom.lastConnectionAttemptAt; /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */ i = failed ? 1 : 0; score = addValToKey(score, 1, i); /* prefer the one we attempted least recently (to cycle through all peers) */ - i = atom->lastConnectionAttemptAt; + i = atom.lastConnectionAttemptAt; score = addValToKey(score, 32, i); /* prefer peers belonging to a torrent of a higher priority */ @@ -2709,16 +2618,16 @@ static uint64_t getPeerCandidateScore(tr_torrent const* tor, struct peer_atom co score = addValToKey(score, 1, i); /* prefer peers that are known to be connectible */ - i = (atom->flags & ADDED_F_CONNECTABLE) != 0 ? 0 : 1; + i = (atom.flags & ADDED_F_CONNECTABLE) != 0 ? 0 : 1; score = addValToKey(score, 1, i); /* prefer peers that we might be able to upload to */ - i = (atom->flags & ADDED_F_SEED_FLAG) == 0 ? 0 : 1; + i = (atom.flags & ADDED_F_SEED_FLAG) == 0 ? 0 : 1; score = addValToKey(score, 1, i); /* Prefer peers that we got from more trusted sources. * lower `fromBest' values indicate more trusted sources */ - score = addValToKey(score, 4, atom->fromBest); + score = addValToKey(score, 4, atom.fromBest); /* salt */ score = addValToKey(score, 8, salt); @@ -2730,7 +2639,7 @@ static bool calculateAllSeeds(tr_swarm* swarm) { static auto constexpr test = [](auto const& atom) { - return atom->isSeed(); + return atom.isSeed(); }; return std::all_of(std::begin(swarm->pool), std::end(swarm->pool), test); } @@ -2801,14 +2710,12 @@ static std::vector getPeerCandidates(tr_session* session, size_t continue; } - for (auto& atom_unique_ptr : tor->swarm->pool) + for (auto& atom : tor->swarm->pool) { - auto* atom = atom_unique_ptr.get(); - if (isPeerCandidate(tor, atom, now)) { uint8_t const salt = tr_rand_int_weak(1024); - candidates.push_back({ getPeerCandidateScore(tor, atom, salt), tor, atom }); + candidates.push_back({ getPeerCandidateScore(tor, atom, salt), tor, &atom }); } } } @@ -2827,26 +2734,26 @@ static std::vector getPeerCandidates(tr_session* session, size_t return candidates; } -static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* atom) +static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom) { time_t const now = tr_time(); - bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed; + bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom.utp_failed; - if (atom->fromFirst == TR_PEER_FROM_PEX) + if (atom.fromFirst == TR_PEER_FROM_PEX) { /* PEX has explicit signalling for uTP support. If an atom originally came from PEX and doesn't have the uTP flag, skip the uTP connection attempt. Are we being optimistic here? */ - utp = utp && (atom->flags & ADDED_F_UTP_FLAGS) != 0; + utp = utp && (atom.flags & ADDED_F_UTP_FLAGS) != 0; } - tr_logAddTraceSwarm(s, fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom->readable())); + tr_logAddTraceSwarm(s, fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom.readable())); tr_peerIo* const io = tr_peerIoNewOutgoing( mgr->session, &mgr->session->top_bandwidth_, - &atom->addr, - atom->port, + &atom.addr, + atom.port, tr_time(), s->tor->infoHash(), s->tor->completeness == TR_SEED, @@ -2854,9 +2761,9 @@ static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* a if (io == nullptr) { - tr_logAddTraceSwarm(s, fmt::format("peerIo not created; marking peer {} as unreachable", atom->readable())); - atom->flags2 |= MyflagUnreachable; - ++atom->num_fails; + tr_logAddTraceSwarm(s, fmt::format("peerIo not created; marking peer {} as unreachable", atom.readable())); + atom.flags2 |= MyflagUnreachable; + ++atom.num_fails; } else { @@ -2866,11 +2773,11 @@ static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* a tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIoNewOutgoing() */ - s->outgoing_handshakes.add(atom->addr, handshake); + s->outgoing_handshakes.add(atom.addr, handshake); } - atom->lastConnectionAttemptAt = now; - atom->time = now; + atom.lastConnectionAttemptAt = now; + atom.time = now; } static void initiateCandidateConnection(tr_peerMgr* mgr, peer_candidate& c) @@ -2883,7 +2790,7 @@ static void initiateCandidateConnection(tr_peerMgr* mgr, peer_candidate& c) #endif - initiateConnection(mgr, c.tor->swarm, c.atom); + initiateConnection(mgr, c.tor->swarm, *c.atom); } static void makeNewPeerConnections(struct tr_peerMgr* mgr, size_t max)