refactor: do not prune peer atoms (#3192)

* refactor; do not prune peer atoms

* feat: improve connectivity check for peer candidates
This commit is contained in:
Charles Kerr 2022-06-04 10:29:24 -05:00 committed by GitHub
parent 4906349e3a
commit 7f1e12f220
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 70 additions and 163 deletions

View File

@ -10,9 +10,11 @@
#include <cstdint> #include <cstdint>
#include <cstdlib> /* qsort */ #include <cstdlib> /* qsort */
#include <ctime> // time_t #include <ctime> // time_t
#include <deque>
#include <iterator> // std::back_inserter #include <iterator> // std::back_inserter
#include <memory> #include <memory>
#include <numeric> // std::accumulate #include <numeric> // std::accumulate
#include <optional>
#include <tuple> // std::tie #include <tuple> // std::tie
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -47,9 +49,6 @@
#include "utils.h" #include "utils.h"
#include "webseed.h" #include "webseed.h"
// how frequently to cull old atoms
static auto constexpr AtomPeriodMsec = int{ 60 * 1000 };
// how frequently to change which peers are choked // how frequently to change which peers are choked
static auto constexpr RechokePeriodMsec = int{ 10 * 1000 }; static auto constexpr RechokePeriodMsec = int{ 10 * 1000 };
@ -213,6 +212,21 @@ struct peer_atom
blocklisted_.reset(); blocklisted_.reset();
} }
std::optional<bool> isReachable() const
{
if ((flags2 & MyflagUnreachable) != 0)
{
return false;
}
if ((flags & ADDED_F_CONNECTABLE) != 0)
{
return true;
}
return std::nullopt;
}
tr_address const addr; tr_address const addr;
tr_port port = {}; tr_port port = {};
@ -324,7 +338,11 @@ public:
std::vector<std::unique_ptr<tr_peer>> webseeds; std::vector<std::unique_ptr<tr_peer>> webseeds;
std::vector<tr_peerMsgs*> peers; std::vector<tr_peerMsgs*> peers;
std::vector<std::unique_ptr<peer_atom>> pool;
// tr_peers hold pointers to the items in this container,
// so use a deque instead of vector to prevent insertion from
// invalidating those pointers
std::deque<peer_atom> pool;
tr_torrent* const tor; tr_torrent* const tor;
@ -352,7 +370,6 @@ struct tr_peerMgr
event* bandwidthTimer = nullptr; event* bandwidthTimer = nullptr;
event* rechokeTimer = nullptr; event* rechokeTimer = nullptr;
event* refillUpkeepTimer = nullptr; event* refillUpkeepTimer = nullptr;
event* atomTimer = nullptr;
}; };
#define tr_logAddDebugSwarm(swarm, msg) tr_logAddDebugTor((swarm)->tor, msg) #define tr_logAddDebugSwarm(swarm, msg) tr_logAddDebugTor((swarm)->tor, msg)
@ -407,10 +424,10 @@ static struct peer_atom* getExistingAtom(tr_swarm const* cswarm, tr_address cons
auto* swarm = const_cast<tr_swarm*>(cswarm); auto* swarm = const_cast<tr_swarm*>(cswarm);
auto const test = [&addr](auto const& atom) auto const test = [&addr](auto const& atom)
{ {
return atom->addr == addr; return atom.addr == addr;
}; };
auto const it = std::find_if(std::begin(swarm->pool), std::end(swarm->pool), test); auto const it = std::find_if(std::begin(swarm->pool), std::end(swarm->pool), test);
return it != std::end(swarm->pool) ? it->get() : nullptr; return it != std::end(swarm->pool) ? &*it : nullptr;
} }
static bool peerIsInUse(tr_swarm const* cs, struct peer_atom const* atom) static bool peerIsInUse(tr_swarm const* cs, struct peer_atom const* atom)
@ -482,7 +499,6 @@ static void deleteTimer(struct event** t)
static void deleteTimers(struct tr_peerMgr* m) static void deleteTimers(struct tr_peerMgr* m)
{ {
deleteTimer(&m->atomTimer);
deleteTimer(&m->bandwidthTimer); deleteTimer(&m->bandwidthTimer);
deleteTimer(&m->rechokeTimer); deleteTimer(&m->rechokeTimer);
deleteTimer(&m->refillUpkeepTimer); deleteTimer(&m->refillUpkeepTimer);
@ -511,7 +527,7 @@ void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr)
{ {
for (auto& atom : tor->swarm->pool) for (auto& atom : tor->swarm->pool)
{ {
atom->setBlocklistedDirty(); atom.setBlocklistedDirty();
} }
} }
} }
@ -939,10 +955,7 @@ static struct peer_atom* ensureAtomExists(
if (a == nullptr) if (a == nullptr)
{ {
auto atom = std::make_unique<peer_atom>(addr, port, flags, from); a = &s->pool.emplace_back(addr, port, flags, from);
a = atom.get();
auto const it = std::lower_bound(std::begin(s->pool), std::end(s->pool), atom);
s->pool.insert(it, std::move(atom));
} }
else else
{ {
@ -1121,7 +1134,7 @@ void tr_peerMgrSetSwarmIsAllSeeds(tr_torrent* tor)
for (auto& atom : swarm->pool) for (auto& atom : swarm->pool)
{ {
atomSetSeed(swarm, *atom); atomSetSeed(swarm, atom);
} }
swarm->pool_is_all_seeds = true; swarm->pool_is_all_seeds = true;
@ -1303,9 +1316,9 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t af, uint8_
{ {
for (auto const& atom : s->pool) for (auto const& atom : s->pool)
{ {
if (isAtomInteresting(tor, *atom)) if (isAtomInteresting(tor, atom))
{ {
atoms.push_back(atom.get()); atoms.push_back(&atom);
} }
} }
} }
@ -1335,7 +1348,6 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t af, uint8_
return pex; return pex;
} }
static void atomPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/);
static void bandwidthPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/); static void bandwidthPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/);
static void rechokePulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/); static void rechokePulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/);
static void reconnectPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/); static void reconnectPulse(evutil_socket_t, short /*unused*/, void* /*vmgr*/);
@ -1349,11 +1361,6 @@ static struct event* createTimer(tr_session* session, int msec, event_callback_f
static void ensureMgrTimersExist(struct tr_peerMgr* m) static void ensureMgrTimersExist(struct tr_peerMgr* m)
{ {
if (m->atomTimer == nullptr)
{
m->atomTimer = createTimer(m->session, AtomPeriodMsec, atomPulse, m);
}
if (m->bandwidthTimer == nullptr) if (m->bandwidthTimer == nullptr)
{ {
m->bandwidthTimer = createTimer(m->session, BandwidthPeriodMsec, bandwidthPulse, m); m->bandwidthTimer = createTimer(m->session, BandwidthPeriodMsec, bandwidthPulse, m);
@ -2502,110 +2509,6 @@ static void bandwidthPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr)
tr_timerAddMsec(*mgr->bandwidthTimer, BandwidthPeriodMsec); tr_timerAddMsec(*mgr->bandwidthTimer, BandwidthPeriodMsec);
} }
/***
****
***/
#if 0
static auto getMaxAtomCount(tr_torrent const* tor)
{
static auto constexpr Limit = uint16_t{ 50 };
static auto constexpr Multiplier = uint16_t{ 3 };
return std::min(Limit, static_cast<uint16_t>(tor->max_connected_peers * Multiplier));
}
struct CompareAtomsByActivity
{
CompareAtomsByActivity(tr_swarm const& swarm)
: swarm_{ swarm }
{
}
[[nodiscard]] int compare(peer_atom const& a, peer_atom const& b) const
{
// primary key: peers in use go first
auto const a_use = peerIsInUse(&swarm_, &a);
auto const b_use = peerIsInUse(&swarm_, &b);
if (a_use != b_use)
{
return a_use ? -1 : 1;
}
auto constexpr data_time_cutoff_secs = int{ 60 * 60 };
auto const tr_now = tr_time();
// secondary key: the last piece data time *if* it was within the last hour
time_t atime = a.piece_data_time;
if (atime + data_time_cutoff_secs < tr_now)
{
atime = 0;
}
time_t btime = b.piece_data_time;
if (btime + data_time_cutoff_secs < tr_now)
{
btime = 0;
}
if (atime != btime)
{
return atime > btime ? -1 : 1;
}
return 0;
}
[[nodiscard]] int compare(std::unique_ptr<peer_atom> const& a, std::unique_ptr<peer_atom> const& b) const noexcept
{
return compare(*a, *b);
}
[[nodiscard]] bool operator()(std::unique_ptr<peer_atom> const& a, std::unique_ptr<peer_atom> const& b) const noexcept
{
return compare(*a, *b) < 0;
}
private:
tr_swarm const& swarm_;
};
#endif
static void atomPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr)
{
auto* mgr = static_cast<tr_peerMgr*>(vmgr);
auto const lock = mgr->unique_lock();
#if 0
for (auto* const tor : mgr->session->torrents())
{
// do we have too many atoms?
auto& atoms = tor->swarm->pool;
auto const max_atom_count = getMaxAtomCount(tor);
auto const atom_count = std::size(atoms);
if (atom_count <= max_atom_count)
{
continue;
}
// we've got too many... prune the oldest / least active
std::partial_sort(
std::begin(atoms),
std::begin(atoms) + max_atom_count,
std::end(atoms),
CompareAtomsByActivity{ *tor->swarm });
atoms.resize(max_atom_count);
tr_logAddTraceSwarm(
tor->swarm,
fmt::format("max atom count is {}... pruned from {} to {}", max_atom_count, atom_count, std::size(atoms)));
}
#endif
tr_timerAddMsec(*mgr->atomTimer, AtomPeriodMsec);
}
/*** /***
**** ****
**** ****
@ -2613,34 +2516,40 @@ static void atomPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr)
***/ ***/
/* is this atom someone that we'd want to initiate a connection to? */ /* is this atom someone that we'd want to initiate a connection to? */
static bool isPeerCandidate(tr_torrent const* tor, struct peer_atom* atom, time_t const now) static bool isPeerCandidate(tr_torrent const* tor, peer_atom const& atom, time_t const now)
{ {
/* not if we're both seeds */ // have we already tried and failed to connect?
if (tor->isDone() && atom->isSeed()) if (auto const reachable = atom.isReachable(); reachable && !*reachable)
{ {
return false; return false;
} }
/* not if we've already got a connection to them... */ // not if we're both seeds
if (peerIsInUse(tor->swarm, atom)) if (tor->isDone() && atom.isSeed())
{ {
return false; return false;
} }
/* not if we just tried them already */ // not if we've already got a connection to them...
if (now - atom->time < atom->getReconnectIntervalSecs(now)) if (peerIsInUse(tor->swarm, &atom))
{ {
return false; return false;
} }
/* not if they're blocklisted */ // not if we just tried them already
if (atom->isBlocklisted(tor->session)) if (now - atom.time < atom.getReconnectIntervalSecs(now))
{ {
return false; return false;
} }
/* not if they're banned... */ // not if they're blocklisted
if ((atom->flags2 & MyflagBanned) != 0) if (atom.isBlocklisted(tor->session))
{
return false;
}
// not if they're banned...
if ((atom.flags2 & MyflagBanned) != 0)
{ {
return false; return false;
} }
@ -2652,7 +2561,7 @@ struct peer_candidate
{ {
uint64_t score; uint64_t score;
tr_torrent* tor; tr_torrent* tor;
struct peer_atom* atom; peer_atom* atom;
}; };
static bool torrentWasRecentlyStarted(tr_torrent const* tor) static bool torrentWasRecentlyStarted(tr_torrent const* tor)
@ -2668,18 +2577,18 @@ static constexpr uint64_t addValToKey(uint64_t value, int width, uint64_t addme)
} }
/* smaller value is better */ /* smaller value is better */
static uint64_t getPeerCandidateScore(tr_torrent const* tor, struct peer_atom const* atom, uint8_t salt) static uint64_t getPeerCandidateScore(tr_torrent const* tor, peer_atom const& atom, uint8_t salt)
{ {
auto i = uint64_t{}; auto i = uint64_t{};
auto score = uint64_t{}; auto score = uint64_t{};
bool const failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt; bool const failed = atom.lastConnectionAt < atom.lastConnectionAttemptAt;
/* prefer peers we've connected to, or never tried, over peers we failed to connect to. */ /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
i = failed ? 1 : 0; i = failed ? 1 : 0;
score = addValToKey(score, 1, i); score = addValToKey(score, 1, i);
/* prefer the one we attempted least recently (to cycle through all peers) */ /* prefer the one we attempted least recently (to cycle through all peers) */
i = atom->lastConnectionAttemptAt; i = atom.lastConnectionAttemptAt;
score = addValToKey(score, 32, i); score = addValToKey(score, 32, i);
/* prefer peers belonging to a torrent of a higher priority */ /* prefer peers belonging to a torrent of a higher priority */
@ -2709,16 +2618,16 @@ static uint64_t getPeerCandidateScore(tr_torrent const* tor, struct peer_atom co
score = addValToKey(score, 1, i); score = addValToKey(score, 1, i);
/* prefer peers that are known to be connectible */ /* prefer peers that are known to be connectible */
i = (atom->flags & ADDED_F_CONNECTABLE) != 0 ? 0 : 1; i = (atom.flags & ADDED_F_CONNECTABLE) != 0 ? 0 : 1;
score = addValToKey(score, 1, i); score = addValToKey(score, 1, i);
/* prefer peers that we might be able to upload to */ /* prefer peers that we might be able to upload to */
i = (atom->flags & ADDED_F_SEED_FLAG) == 0 ? 0 : 1; i = (atom.flags & ADDED_F_SEED_FLAG) == 0 ? 0 : 1;
score = addValToKey(score, 1, i); score = addValToKey(score, 1, i);
/* Prefer peers that we got from more trusted sources. /* Prefer peers that we got from more trusted sources.
* lower `fromBest' values indicate more trusted sources */ * lower `fromBest' values indicate more trusted sources */
score = addValToKey(score, 4, atom->fromBest); score = addValToKey(score, 4, atom.fromBest);
/* salt */ /* salt */
score = addValToKey(score, 8, salt); score = addValToKey(score, 8, salt);
@ -2730,7 +2639,7 @@ static bool calculateAllSeeds(tr_swarm* swarm)
{ {
static auto constexpr test = [](auto const& atom) static auto constexpr test = [](auto const& atom)
{ {
return atom->isSeed(); return atom.isSeed();
}; };
return std::all_of(std::begin(swarm->pool), std::end(swarm->pool), test); return std::all_of(std::begin(swarm->pool), std::end(swarm->pool), test);
} }
@ -2801,14 +2710,12 @@ static std::vector<peer_candidate> getPeerCandidates(tr_session* session, size_t
continue; continue;
} }
for (auto& atom_unique_ptr : tor->swarm->pool) for (auto& atom : tor->swarm->pool)
{ {
auto* atom = atom_unique_ptr.get();
if (isPeerCandidate(tor, atom, now)) if (isPeerCandidate(tor, atom, now))
{ {
uint8_t const salt = tr_rand_int_weak(1024); uint8_t const salt = tr_rand_int_weak(1024);
candidates.push_back({ getPeerCandidateScore(tor, atom, salt), tor, atom }); candidates.push_back({ getPeerCandidateScore(tor, atom, salt), tor, &atom });
} }
} }
} }
@ -2827,26 +2734,26 @@ static std::vector<peer_candidate> getPeerCandidates(tr_session* session, size_t
return candidates; return candidates;
} }
static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* atom) static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, peer_atom& atom)
{ {
time_t const now = tr_time(); time_t const now = tr_time();
bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed; bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom.utp_failed;
if (atom->fromFirst == TR_PEER_FROM_PEX) if (atom.fromFirst == TR_PEER_FROM_PEX)
{ {
/* PEX has explicit signalling for uTP support. If an atom /* PEX has explicit signalling for uTP support. If an atom
originally came from PEX and doesn't have the uTP flag, skip the originally came from PEX and doesn't have the uTP flag, skip the
uTP connection attempt. Are we being optimistic here? */ uTP connection attempt. Are we being optimistic here? */
utp = utp && (atom->flags & ADDED_F_UTP_FLAGS) != 0; utp = utp && (atom.flags & ADDED_F_UTP_FLAGS) != 0;
} }
tr_logAddTraceSwarm(s, fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom->readable())); tr_logAddTraceSwarm(s, fmt::format("Starting an OUTGOING {} connection with {}", utp ? " µTP" : "TCP", atom.readable()));
tr_peerIo* const io = tr_peerIoNewOutgoing( tr_peerIo* const io = tr_peerIoNewOutgoing(
mgr->session, mgr->session,
&mgr->session->top_bandwidth_, &mgr->session->top_bandwidth_,
&atom->addr, &atom.addr,
atom->port, atom.port,
tr_time(), tr_time(),
s->tor->infoHash(), s->tor->infoHash(),
s->tor->completeness == TR_SEED, s->tor->completeness == TR_SEED,
@ -2854,9 +2761,9 @@ static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* a
if (io == nullptr) if (io == nullptr)
{ {
tr_logAddTraceSwarm(s, fmt::format("peerIo not created; marking peer {} as unreachable", atom->readable())); tr_logAddTraceSwarm(s, fmt::format("peerIo not created; marking peer {} as unreachable", atom.readable()));
atom->flags2 |= MyflagUnreachable; atom.flags2 |= MyflagUnreachable;
++atom->num_fails; ++atom.num_fails;
} }
else else
{ {
@ -2866,11 +2773,11 @@ static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* a
tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIoNewOutgoing() */ tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIoNewOutgoing() */
s->outgoing_handshakes.add(atom->addr, handshake); s->outgoing_handshakes.add(atom.addr, handshake);
} }
atom->lastConnectionAttemptAt = now; atom.lastConnectionAttemptAt = now;
atom->time = now; atom.time = now;
} }
static void initiateCandidateConnection(tr_peerMgr* mgr, peer_candidate& c) static void initiateCandidateConnection(tr_peerMgr* mgr, peer_candidate& c)
@ -2883,7 +2790,7 @@ static void initiateCandidateConnection(tr_peerMgr* mgr, peer_candidate& c)
#endif #endif
initiateConnection(mgr, c.tor->swarm, c.atom); initiateConnection(mgr, c.tor->swarm, *c.atom);
} }
static void makeNewPeerConnections(struct tr_peerMgr* mgr, size_t max) static void makeNewPeerConnections(struct tr_peerMgr* mgr, size_t max)