fix: invalid tr_peerMsgsImpl::prefetchCount value (#3024)

* fix: invalid tr_peerMsgsImpl::prefetchCount value

This count could get corrupted due to imprecise bookkeeping when
processing peers' cancel messages. This PR replaces the field with
a `prefetched` flag in the struct that represents a peer request.

This approach has simpler code, simpler logic, and is harder to break.
This commit is contained in:
Charles Kerr 2022-04-29 12:44:22 -05:00 committed by GitHub
parent d487a5d8d7
commit e4221916b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 81 deletions

View File

@ -83,9 +83,6 @@ public:
/* number of bad pieces they've contributed to */
uint8_t strikes = 0;
/* how many requests the peer has made that we haven't responded to yet */
int pendingReqsToClient = 0;
tr_session* const session;
/* Hook to private peer-mgr information */

View File

@ -1699,7 +1699,7 @@ static auto getPeerStats(tr_peerMsgs const* peer, time_t now, uint64_t now_msec)
stats.cancelsToClient = peer->cancelsSentToClient.count(now, CancelHistorySec);
stats.pendingReqsToPeer = peer->swarm->active_requests.count(peer);
stats.pendingReqsToClient = peer->pendingReqsToClient;
stats.pendingReqsToClient = peer->pendingReqsToClient();
char* pch = stats.flagStr;

View File

@ -128,7 +128,7 @@ static auto constexpr HighPriorityIntervalSecs = int{ 2 };
static auto constexpr LowPriorityIntervalSecs = int{ 10 };
// how many blocks to keep prefetched per peer
static auto constexpr PrefetchSize = int{ 18 };
static auto constexpr PrefetchMax = size_t{ 18 };
// when we're making requests from another peer,
// batch them together to send enough requests to
@ -163,9 +163,14 @@ enum class EncryptionPreference
struct peer_request
{
uint32_t index;
uint32_t offset;
uint32_t length;
uint32_t index = 0;
uint32_t offset = 0;
uint32_t length = 0;
[[nodiscard]] auto constexpr operator==(peer_request const& that) const noexcept
{
return this->index == that.index && this->offset == that.offset && this->length == that.length;
}
};
static peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block)
@ -230,7 +235,7 @@ using UniqueTimer = std::unique_ptr<struct event, EventDeleter>;
* @see struct peer_atom
* @see tr_peer
*/
class tr_peerMsgsImpl : public tr_peerMsgs
class tr_peerMsgsImpl final : public tr_peerMsgs
{
public:
tr_peerMsgsImpl(tr_torrent* torrent_in, peer_atom* atom_in, tr_peerIo* io_in, tr_peer_callback callback, void* callbackData)
@ -310,6 +315,11 @@ public:
return Bps > 0;
}
[[nodiscard]] size_t pendingReqsToClient() const noexcept override
{
return std::size(peer_requested_);
}
[[nodiscard]] bool is_peer_choked() const noexcept override
{
return peer_is_choked_;
@ -589,8 +599,6 @@ public:
size_t desired_request_count = 0;
int prefetchCount = 0;
/* how long the outMessages batch should be allowed to grow before
* it's flushed -- some messages (like requests >:) should be sent
* very quickly; others aren't as urgent. */
@ -618,7 +626,17 @@ public:
evbuffer* const outMessages; /* all the non-piece messages */
struct peer_request peerAskedFor[ReqQ] = {};
struct QueuedPeerRequest : public peer_request
{
explicit QueuedPeerRequest(peer_request in) noexcept
: peer_request{ in }
{
}
bool prefetched = false;
};
std::vector<QueuedPeerRequest> peer_requested_;
int peerAskedForMetadata[MetadataReqQ] = {};
int peerAskedForMetadataCount = 0;
@ -939,33 +957,17 @@ static bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* piece)
return true;
}
static bool popNextRequest(tr_peerMsgsImpl* msgs, struct peer_request* setme)
{
if (msgs->pendingReqsToClient == 0)
{
return false;
}
*setme = msgs->peerAskedFor[0];
tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->pendingReqsToClient);
--msgs->pendingReqsToClient;
return true;
}
static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs)
{
struct peer_request req;
bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io);
while (popNextRequest(msgs, &req))
if (auto const must_send_rej = tr_peerIoSupportsFEXT(msgs->io); must_send_rej)
{
if (mustSendCancel)
for (auto& req : msgs->peer_requested_)
{
protocolSendReject(msgs, &req);
}
}
msgs->peer_requested_.clear();
}
/**
@ -1432,58 +1434,59 @@ static void prefetchPieces(tr_peerMsgsImpl* msgs)
return;
}
for (int i = msgs->prefetchCount; i < msgs->pendingReqsToClient && i < PrefetchSize; ++i)
// ensure that the first `PrefetchMax` items in `msgs->peer_requested_` are prefetched.
auto& requests = msgs->peer_requested_;
for (size_t i = 0, n = std::min(PrefetchMax, std::size(requests)); i < n; ++i)
{
struct peer_request const* req = msgs->peerAskedFor + i;
if (requestIsValid(msgs, req))
if (auto& req = requests[i]; !req.prefetched)
{
tr_cachePrefetchBlock(
msgs->session->cache,
msgs->torrent,
msgs->torrent->pieceLoc(req->index, req->offset),
req->length);
++msgs->prefetchCount;
msgs->torrent->pieceLoc(req.index, req.offset),
req.length);
req.prefetched = true;
}
}
}
static void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req)
[[nodiscard]] static bool canAddRequestFromPeer(tr_peerMsgsImpl const* const msgs, struct peer_request const& req)
{
bool const fext = tr_peerIoSupportsFEXT(msgs->io);
bool const reqIsValid = requestIsValid(msgs, req);
bool const clientHasPiece = reqIsValid && msgs->torrent->hasPiece(req->index);
bool const peerIsChoked = msgs->peer_is_choked_;
bool allow = false;
if (!reqIsValid)
{
logtrace(msgs, "rejecting an invalid request.");
}
else if (!clientHasPiece)
{
logtrace(msgs, "rejecting request for a piece we don't have.");
}
else if (peerIsChoked)
if (msgs->peer_is_choked_)
{
logtrace(msgs, "rejecting request from choked peer");
}
else if (msgs->pendingReqsToClient + 1 >= ReqQ)
{
logtrace(msgs, "rejecting request ... reqq is full");
}
else
{
allow = true;
return false;
}
if (allow)
if (std::size(msgs->peer_requested_) >= ReqQ)
{
msgs->peerAskedFor[msgs->pendingReqsToClient++] = *req;
logtrace(msgs, "rejecting request ... reqq is full");
return false;
}
if (!tr_torrentReqIsValid(msgs->torrent, req.index, req.offset, req.length))
{
logtrace(msgs, "rejecting an invalid request.");
return false;
}
if (!msgs->torrent->hasPiece(req.index))
{
logtrace(msgs, "rejecting request for a piece we don't have.");
return false;
}
return true;
}
static void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req)
{
if (canAddRequestFromPeer(msgs, *req))
{
msgs->peer_requested_.emplace_back(*req);
prefetchPieces(msgs);
}
else if (fext)
else if (tr_peerIoSupportsFEXT(msgs->io))
{
protocolSendReject(msgs, req);
}
@ -1724,23 +1727,19 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
msgs->cancelsSentToClient.add(tr_time(), 1);
logtrace(msgs, fmt::format(FMT_STRING("got a Cancel {:d}:{:d}->{:d}"), r.index, r.offset, r.length));
for (int i = 0; i < msgs->pendingReqsToClient; ++i)
auto& requests = msgs->peer_requested_;
if (auto iter = std::find(std::begin(requests), std::end(requests), r); iter != std::end(requests))
{
struct peer_request const* req = msgs->peerAskedFor + i;
requests.erase(iter);
if (req->index == r.index && req->offset == r.offset && req->length == r.length)
// bep6: "Even when a request is cancelled, the peer
// receiving the cancel should respond with either the
// corresponding reject or the corresponding piece"
if (fext)
{
tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request), msgs->pendingReqsToClient);
--msgs->pendingReqsToClient;
if (fext)
{
protocolSendReject(msgs, &r);
}
break;
protocolSendReject(msgs, &r);
}
}
break;
}
@ -2196,9 +2195,10 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
*** Data Blocks
**/
if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= tr_block_info::BlockSize && popNextRequest(msgs, &req))
if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_))
{
--msgs->prefetchCount;
req = msgs->peer_requested_.front();
msgs->peer_requested_.erase(std::begin(msgs->peer_requested_));
if (requestIsValid(msgs, &req) && msgs->torrent->hasPiece(req.index))
{

View File

@ -35,6 +35,9 @@ public:
virtual ~tr_peerMsgs() override = default;
/* how many requests the peer has made that we haven't responded to yet */
[[nodiscard]] virtual size_t pendingReqsToClient() const noexcept = 0;
[[nodiscard]] virtual bool is_peer_choked() const noexcept = 0;
[[nodiscard]] virtual bool is_peer_interested() const noexcept = 0;
[[nodiscard]] virtual bool is_client_choked() const noexcept = 0;