refactor: add tr_peerMsgs.networkSocket() (#3175)

This commit is contained in:
Charles Kerr 2022-06-01 21:33:33 -05:00 committed by GitHub
parent f5e6c5b782
commit 75a7c5f080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 33 deletions

View File

@ -76,6 +76,8 @@ public:
virtual bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const = 0;
[[nodiscard]] virtual std::string readable() const = 0;
/* whether or not we should free this peer soon.
NOTE: private to peer-mgr.c */
bool doPurge = false;

View File

@ -438,11 +438,6 @@ tr_peer::~tr_peer()
***
**/
tr_address const* tr_peerAddress(tr_peer const* peer)
{
return &peer->atom->addr;
}
static tr_swarm* getExistingSwarm(tr_peerMgr* manager, tr_sha1_digest_t const& hash)
{
auto* const tor = manager->session->torrents().get(hash);
@ -777,14 +772,13 @@ static void refillUpkeep(evutil_socket_t /*fd*/, short /*what*/, void* vmgr)
static void addStrike(tr_swarm* s, tr_peer* peer)
{
tr_logAddTraceSwarm(s, fmt::format("increasing peer {} strike count to {}", peer->atom->readable(), peer->strikes + 1));
tr_logAddTraceSwarm(s, fmt::format("increasing peer {} strike count to {}", peer->readable(), peer->strikes + 1));
if (++peer->strikes >= MaxBadPiecesPerPeer)
{
struct peer_atom* atom = peer->atom;
atom->flags2 |= MyflagBanned;
peer->atom->flags2 |= MyflagBanned;
peer->doPurge = true;
tr_logAddTraceSwarm(s, fmt::format("banning peer {}", atom->readable()));
tr_logAddTraceSwarm(s, fmt::format("banning peer {}", peer->readable()));
}
}
@ -875,8 +869,8 @@ static void peerCallbackFunc(tr_peer* peer, tr_peer_event const* e, void* vs)
{
case TR_PEER_PEER_GOT_PIECE_DATA:
{
time_t const now = tr_time();
tr_torrent* tor = s->tor;
auto const now = tr_time();
auto* const tor = s->tor;
tor->uploadedCur += e->length;
tr_announcerAddBytes(tor, TR_ANN_UP, e->length);
@ -894,8 +888,8 @@ static void peerCallbackFunc(tr_peer* peer, tr_peer_event const* e, void* vs)
case TR_PEER_CLIENT_GOT_PIECE_DATA:
{
time_t const now = tr_time();
tr_torrent* tor = s->tor;
auto const now = tr_time();
auto* const tor = s->tor;
tor->downloadedCur += e->length;
tor->setDateActive(now);
@ -960,9 +954,7 @@ static void peerCallbackFunc(tr_peer* peer, tr_peer_event const* e, void* vs)
peer->doPurge = true;
tr_logAddDebugSwarm(
s,
fmt::format(
"setting {} doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error",
peer->atom->readable()));
fmt::format("setting {} doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", peer->readable()));
}
else
{
@ -1261,7 +1253,7 @@ void tr_peerMgrGotBadPiece(tr_torrent* tor, tr_piece_index_t pieceIndex)
s,
fmt::format(
"peer {} contributed to corrupt piece ({}); now has {} strikes",
peer->atom->readable(),
peer->readable(),
pieceIndex,
peer->strikes + 1));
addStrike(s, peer);
@ -1672,9 +1664,11 @@ static auto getPeerStats(tr_peerMsgs const* peer, time_t now, uint64_t now_msec)
auto stats = tr_peer_stat{};
auto const* const atom = peer->atom;
tr_address_to_string_with_buf(&atom->addr, stats.addr, sizeof(stats.addr));
auto const [addr, port] = peer->socketAddress();
tr_address_to_string_with_buf(&addr, stats.addr, sizeof(stats.addr));
stats.client = peer->client.c_str();
stats.port = peer->atom->port.host();
stats.port = port.host();
stats.from = atom->fromFirst;
stats.progress = peer->progress;
stats.isUTP = peer->is_utp_connection();
@ -1938,7 +1932,7 @@ static void rechokeDownloads(tr_swarm* s)
if (peerCount > 0)
{
tr_torrent const* const tor = s->tor;
auto const* const tor = s->tor;
int const n = tor->pieceCount();
/* build a bitfield of interesting pieces... */
@ -2125,7 +2119,7 @@ static void rechokeUploads(tr_swarm* s, uint64_t const now)
/* sort the peers by preference and rate */
for (auto* const peer : peers)
{
peer_atom const* const atom = peer->atom;
auto const* const atom = peer->atom;
if (tr_peerIsSeed(peer))
{
@ -2250,16 +2244,16 @@ static void rechokePulse(evutil_socket_t /*fd*/, short /*what*/, void* vmgr)
static bool shouldPeerBeClosed(tr_swarm const* s, tr_peer const* peer, int peerCount, time_t const now)
{
tr_torrent const* tor = s->tor;
struct peer_atom const* atom = peer->atom;
/* if it's marked for purging, close it */
if (peer->doPurge)
{
tr_logAddTraceSwarm(s, fmt::format("purging peer {} because its doPurge flag is set", atom->readable()));
tr_logAddTraceSwarm(s, fmt::format("purging peer {} because its doPurge flag is set", peer->readable()));
return true;
}
auto const* tor = s->tor;
auto const* const atom = peer->atom;
/* disconnect if we're both seeds and enough time has passed for PEX */
if (tor->isDone() && tr_peerIsSeed(peer))
{
@ -2283,7 +2277,7 @@ static bool shouldPeerBeClosed(tr_swarm const* s, tr_peer const* peer, int peerC
{
tr_logAddTraceSwarm(
s,
fmt::format("purging peer {} because it's been {} secs since we shared anything", atom->readable(), idleTime));
fmt::format("purging peer {} because it's been {} secs since we shared anything", peer->readable(), idleTime));
return true;
}
}
@ -2296,7 +2290,7 @@ static void removePeer(tr_peer* peer)
auto* const s = peer->swarm;
auto const lock = s->manager->unique_lock();
struct peer_atom* atom = peer->atom;
auto* const atom = peer->atom;
TR_ASSERT(atom != nullptr);
atom->time = tr_time();
@ -2324,16 +2318,16 @@ static void closePeer(tr_peer* peer)
to them fruitlessly, so mark it as another fail */
if (auto* const atom = peer->atom; atom->piece_data_time != 0)
{
tr_logAddTraceSwarm(s, fmt::format("resetting atom {} num_fails to 0", atom->readable()));
tr_logAddTraceSwarm(s, fmt::format("resetting atom {} num_fails to 0", peer->readable()));
atom->num_fails = 0;
}
else
{
++atom->num_fails;
tr_logAddTraceSwarm(s, fmt::format("incremented atom {} num_fails to {}", atom->readable(), atom->num_fails));
tr_logAddTraceSwarm(s, fmt::format("incremented atom {} num_fails to {}", peer->readable(), atom->num_fails));
}
tr_logAddTraceSwarm(s, fmt::format("removing bad peer {}", peer->atom->readable()));
tr_logAddTraceSwarm(s, fmt::format("removing bad peer {}", peer->readable()));
removePeer(peer);
}

View File

@ -106,8 +106,6 @@ constexpr bool tr_isPex(tr_pex const* pex)
return pex && tr_address_is_valid(&pex->addr);
}
tr_address const* tr_peerAddress(tr_peer const*);
tr_peerMgr* tr_peerMgrNew(tr_session* session);
void tr_peerMgrFree(tr_peerMgr* manager);

View File

@ -371,6 +371,17 @@ public:
return io->time_created < timestamp;
}
[[nodiscard]] std::pair<tr_address, tr_port> socketAddress() const override
{
return io->socketAddress();
}
[[nodiscard]] std::string readable() const override
{
auto const [addr, port] = socketAddress();
return addr.readable(port);
}
void cancel_block_request(tr_block_index_t block) override
{
protocolSendCancel(this, blockToReq(torrent, block));
@ -1755,7 +1766,7 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
if (auto const dht_port = tr_port::fromNetwork(nport); !std::empty(dht_port))
{
msgs->dht_port = dht_port;
tr_dhtAddNode(msgs->session, tr_peerAddress(msgs), msgs->dht_port, false);
tr_dhtAddNode(msgs->session, &msgs->io->address(), msgs->dht_port, false);
}
}
break;

View File

@ -52,6 +52,8 @@ public:
[[nodiscard]] virtual bool is_connection_older_than(time_t time) const noexcept = 0;
[[nodiscard]] virtual std::pair<tr_address, tr_port> socketAddress() const = 0;
virtual void cancel_block_request(tr_block_index_t block) = 0;
virtual void set_choke(bool peer_is_choked) = 0;

View File

@ -198,6 +198,16 @@ public:
return is_active;
}
[[nodiscard]] std::string readable() const override
{
if (auto const parsed = tr_urlParse(base_url); parsed)
{
return fmt::format(FMT_STRING("{:s}:{:d}"), parsed->host, parsed->port);
}
return base_url;
}
void gotPieceData(uint32_t n_bytes)
{
bandwidth.notifyBandwidthConsumed(TR_DOWN, n_bytes, true, tr_time_msec());