refactor: simplify sorting peers by liveliness (#2050)

This commit is contained in:
Charles Kerr 2021-10-27 21:05:00 -05:00 committed by GitHub
parent 6bd229fcfd
commit 9be93a489e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 83 additions and 160 deletions

View File

@ -11,6 +11,7 @@
#include <climits> /* INT_MAX */
#include <cstdlib> /* qsort */
#include <cstring> /* memcpy, memcmp, strstr */
#include <iterator>
#include <vector>
#include <event2/event.h>
@ -3139,8 +3140,9 @@ static int getReconnectIntervalSecs(struct peer_atom const* atom, time_t const n
return sec;
}
static void removePeer(tr_swarm* s, tr_peer* peer)
static void removePeer(tr_peer* peer)
{
auto* const s = peer->swarm;
TR_ASSERT(swarmIsLocked(s));
struct peer_atom* atom = peer->atom;
@ -3158,12 +3160,11 @@ static void removePeer(tr_swarm* s, tr_peer* peer)
delete peer;
}
static void closePeer(tr_swarm* s, tr_peer* peer)
static void closePeer(tr_peer* peer)
{
TR_ASSERT(s != nullptr);
TR_ASSERT(peer != nullptr);
struct peer_atom* atom = peer->atom;
auto* const s = peer->swarm;
peer_atom* const atom = peer->atom;
/* if we transferred piece data, then they might be good peers,
so reset their `numFails' weight to zero. otherwise we connected
@ -3180,14 +3181,14 @@ static void closePeer(tr_swarm* s, tr_peer* peer)
}
tordbg(s, "removing bad peer %s", tr_atomAddrStr(peer->atom));
removePeer(s, peer);
removePeer(peer);
}
static void removeAllPeers(tr_swarm* s)
{
while (!tr_ptrArrayEmpty(&s->peers))
{
removePeer(s, static_cast<tr_peer*>(tr_ptrArrayNth(&s->peers, 0)));
removePeer(static_cast<tr_peer*>(tr_ptrArrayNth(&s->peers, 0)));
}
TR_ASSERT(s->stats.peerCount == 0);
@ -3202,162 +3203,89 @@ static void closeBadPeers(tr_swarm* s, time_t const now_sec)
for (int i = 0; i < peerCount; ++i)
{
closePeer(s, peers[i]);
closePeer(peers[i]);
}
tr_free(peers);
}
}
struct peer_liveliness
struct ComparePeerByActivity
{
tr_peer* peer;
void* clientData;
time_t pieceDataTime;
time_t time;
unsigned int speed;
bool doPurge;
int compare(tr_peer const* a, tr_peer const* b) const // <=>
{
if (a->doPurge != b->doPurge)
{
return a->doPurge ? 1 : -1;
}
/* the one to give us data more recently goes first */
if (a->atom->piece_data_time != b->atom->piece_data_time)
{
return a->atom->piece_data_time > b->atom->piece_data_time ? -1 : 1;
}
/* the one we connected to most recently goes first */
if (a->atom->time != b->atom->time)
{
return a->atom->time > b->atom->time ? -1 : 1;
}
return 0;
}
bool operator()(tr_peer const* a, tr_peer const* b) const // less then
{
return compare(a, b) < 0;
}
};
static int comparePeerLiveliness(void const* va, void const* vb)
{
auto const* const a = static_cast<struct peer_liveliness const*>(va);
auto const* const b = static_cast<struct peer_liveliness const*>(vb);
if (a->doPurge != b->doPurge)
{
return a->doPurge ? 1 : -1;
}
if (a->speed != b->speed) /* faster goes first */
{
return a->speed > b->speed ? -1 : 1;
}
/* the one to give us data more recently goes first */
if (a->pieceDataTime != b->pieceDataTime)
{
return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
}
/* the one we connected to most recently goes first */
if (a->time != b->time)
{
return a->time > b->time ? -1 : 1;
}
return 0;
}
static void sortPeersByLivelinessImpl(tr_peer** peers, void** clientData, int n, uint64_t now, tr_voidptr_compare_func compare)
{
// build a sortable array of peer + extra info
struct peer_liveliness* lives = tr_new0(struct peer_liveliness, n);
for (int i = 0; i < n; ++i)
{
tr_peer* p = peers[i];
struct peer_liveliness* const l = &lives[i];
l->peer = p;
l->doPurge = p->doPurge;
l->pieceDataTime = p->atom->piece_data_time;
l->time = p->atom->time;
l->speed = tr_peerGetPieceSpeed_Bps(p, now, TR_UP) + tr_peerGetPieceSpeed_Bps(p, now, TR_DOWN);
if (clientData != nullptr)
{
l->clientData = clientData[i];
}
}
// sort 'em
qsort(lives, n, sizeof(struct peer_liveliness), compare);
// build the peer array
for (int i = 0; i < n; ++i)
{
struct peer_liveliness* const l = &lives[i];
peers[i] = l->peer;
if (clientData != nullptr)
{
clientData[i] = l->clientData;
}
}
// cleanup
tr_free(lives);
}
static void sortPeersByLiveliness(tr_peer** peers, void** clientData, int n, uint64_t now)
{
sortPeersByLivelinessImpl(peers, clientData, n, now, comparePeerLiveliness);
}
static void enforceTorrentPeerLimit(tr_swarm* s, uint64_t now)
static void enforceTorrentPeerLimit(tr_swarm* s)
{
// do we have too many peers?
int n = tr_ptrArraySize(&s->peers);
int const max = tr_torrentGetPeerLimit(s->tor);
if (n > max)
if (n <= max)
{
void const* const base = tr_ptrArrayBase(&s->peers);
auto** const peers = static_cast<tr_peer**>(tr_memdup(base, n * sizeof(tr_peer*)));
sortPeersByLiveliness(peers, nullptr, n, now);
while (n > max)
{
closePeer(s, peers[--n]);
}
tr_free(peers);
return;
}
// close all but the `max` most active
auto peers = std::vector<tr_peer*>{};
peers.reserve(n);
auto** base = (tr_peer**)tr_ptrArrayBase(&s->peers);
std::copy_n(base, n, std::back_inserter(peers));
std::partial_sort(std::begin(peers), std::begin(peers) + max, std::end(peers), ComparePeerByActivity{});
std::for_each(std::begin(peers) + max, std::end(peers), closePeer);
}
static void enforceSessionPeerLimit(tr_session* session, uint64_t now)
static void enforceSessionPeerLimit(tr_session* session)
{
/* count the total number of peers */
int n = 0;
for (auto const* tor : session->torrents)
// do we have too many peers?
size_t const n_peers = std::accumulate(
std::begin(session->torrents),
std::end(session->torrents),
size_t{},
[](size_t sum, tr_torrent* tor) { return sum + tr_ptrArraySize(&tor->swarm->peers); });
size_t const max = tr_sessionGetPeerLimit(session);
if (n_peers <= max)
{
n += tr_ptrArraySize(&tor->swarm->peers);
return;
}
/* if there are too many, prune out the worst */
int const max = tr_sessionGetPeerLimit(session);
if (n > max)
// make a list of all the peers
auto peers = std::vector<tr_peer*>{};
peers.reserve(n_peers);
for (auto* tor : session->torrents)
{
tr_peer** peers = tr_new(tr_peer*, n);
tr_swarm** swarms = tr_new(tr_swarm*, n);
/* populate the peer array */
n = 0;
for (auto* tor : session->torrents)
{
tr_swarm* s = tor->swarm;
for (int i = 0, tn = tr_ptrArraySize(&s->peers); i < tn; ++i)
{
peers[n] = static_cast<tr_peer*>(tr_ptrArrayNth(&s->peers, i));
swarms[n] = s;
++n;
}
}
/* sort 'em */
sortPeersByLiveliness(peers, (void**)swarms, n, now);
/* cull out the crappiest */
while (n-- > max)
{
closePeer(swarms[n], peers[n]);
}
/* cleanup */
tr_free(swarms);
tr_free(peers);
size_t const n = tr_ptrArraySize(&tor->swarm->peers);
auto** base = (tr_peer**)tr_ptrArrayBase(&tor->swarm->peers);
std::copy_n(base, n, std::back_inserter(peers));
}
// close all but the `max` most active
std::partial_sort(std::begin(peers), std::begin(peers) + max, std::end(peers), ComparePeerByActivity{});
std::for_each(std::begin(peers) + max, std::end(peers), closePeer);
}
static void makeNewPeerConnections(tr_peerMgr* mgr, size_t max);
@ -3366,25 +3294,8 @@ static void reconnectPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr)
{
auto* mgr = static_cast<tr_peerMgr*>(vmgr);
time_t const now_sec = tr_time();
uint64_t const now_msec = tr_time_msec();
/**
*** enforce the per-session and per-torrent peer limits
**/
/* if we're over the per-torrent peer limits, cull some peers */
for (auto* tor : mgr->session->torrents)
{
if (tor->isRunning)
{
enforceTorrentPeerLimit(tor->swarm, now_msec);
}
}
/* if we're over the per-session peer limits, cull some peers */
enforceSessionPeerLimit(mgr->session, now_msec);
/* remove crappy peers */
// remove crappy peers
for (auto* tor : mgr->session->torrents)
{
if (!tor->swarm->isRunning)
@ -3397,7 +3308,19 @@ static void reconnectPulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr)
}
}
/* try to make new peer connections */
// if we're over the per-torrent peer limits, cull some peers
for (auto* tor : mgr->session->torrents)
{
if (tor->isRunning)
{
enforceTorrentPeerLimit(tor->swarm);
}
}
// if we're over the per-session peer limits, cull some peers
enforceSessionPeerLimit(mgr->session);
// try to make new peer connections
int const MaxConnectionsPerPulse = (int)(MaxConnectionsPerSecond * (ReconnectPeriodMsec / 1000.0));
makeNewPeerConnections(mgr, MaxConnectionsPerPulse);
}