From e4221916b1360ddaf6fe93385d08bd488bc571bd Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Fri, 29 Apr 2022 12:44:22 -0500 Subject: [PATCH] fix: invalid tr_peerMsgsImpl::prefetchCount value (#3024) * fix: invalid tr_peerMsgsImpl::prefetchCount value This count could get corrupted due to imprecise bookkeeping when processing peers' cancel messages. This PR replaces the field with a `prefetched` flag in the struct that represents a peer request. This approach has simpler code, simpler logic, and is harder to break. --- libtransmission/peer-common.h | 3 - libtransmission/peer-mgr.cc | 2 +- libtransmission/peer-msgs.cc | 154 +++++++++++++++++----------------- libtransmission/peer-msgs.h | 3 + 4 files changed, 81 insertions(+), 81 deletions(-) diff --git a/libtransmission/peer-common.h b/libtransmission/peer-common.h index 18527f561..1f3b5940d 100644 --- a/libtransmission/peer-common.h +++ b/libtransmission/peer-common.h @@ -83,9 +83,6 @@ public: /* number of bad pieces they've contributed to */ uint8_t strikes = 0; - /* how many requests the peer has made that we haven't responded to yet */ - int pendingReqsToClient = 0; - tr_session* const session; /* Hook to private peer-mgr information */ diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 893b769ca..f3744c3e1 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -1699,7 +1699,7 @@ static auto getPeerStats(tr_peerMsgs const* peer, time_t now, uint64_t now_msec) stats.cancelsToClient = peer->cancelsSentToClient.count(now, CancelHistorySec); stats.pendingReqsToPeer = peer->swarm->active_requests.count(peer); - stats.pendingReqsToClient = peer->pendingReqsToClient; + stats.pendingReqsToClient = peer->pendingReqsToClient(); char* pch = stats.flagStr; diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index f983858e1..8b0d0700e 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -128,7 +128,7 @@ static auto constexpr HighPriorityIntervalSecs = int{ 2 }; static auto constexpr LowPriorityIntervalSecs = int{ 10 }; // how many blocks to keep prefetched per peer -static auto constexpr PrefetchSize = int{ 18 }; +static auto constexpr PrefetchMax = size_t{ 18 }; // when we're making requests from another peer, // batch them together to send enough requests to @@ -163,9 +163,14 @@ enum class EncryptionPreference struct peer_request { - uint32_t index; - uint32_t offset; - uint32_t length; + uint32_t index = 0; + uint32_t offset = 0; + uint32_t length = 0; + + [[nodiscard]] auto constexpr operator==(peer_request const& that) const noexcept + { + return this->index == that.index && this->offset == that.offset && this->length == that.length; + } }; static peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block) @@ -230,7 +235,7 @@ using UniqueTimer = std::unique_ptr; * @see struct peer_atom * @see tr_peer */ -class tr_peerMsgsImpl : public tr_peerMsgs +class tr_peerMsgsImpl final : public tr_peerMsgs { public: tr_peerMsgsImpl(tr_torrent* torrent_in, peer_atom* atom_in, tr_peerIo* io_in, tr_peer_callback callback, void* callbackData) @@ -310,6 +315,11 @@ public: return Bps > 0; } + [[nodiscard]] size_t pendingReqsToClient() const noexcept override + { + return std::size(peer_requested_); + } + [[nodiscard]] bool is_peer_choked() const noexcept override { return peer_is_choked_; @@ -589,8 +599,6 @@ public: size_t desired_request_count = 0; - int prefetchCount = 0; - /* how long the outMessages batch should be allowed to grow before * it's flushed -- some messages (like requests >:) should be sent * very quickly; others aren't as urgent. */ @@ -618,7 +626,17 @@ public: evbuffer* const outMessages; /* all the non-piece messages */ - struct peer_request peerAskedFor[ReqQ] = {}; + struct QueuedPeerRequest : public peer_request + { + explicit QueuedPeerRequest(peer_request in) noexcept + : peer_request{ in } + { + } + + bool prefetched = false; + }; + + std::vector peer_requested_; int peerAskedForMetadata[MetadataReqQ] = {}; int peerAskedForMetadataCount = 0; @@ -939,33 +957,17 @@ static bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* piece) return true; } -static bool popNextRequest(tr_peerMsgsImpl* msgs, struct peer_request* setme) -{ - if (msgs->pendingReqsToClient == 0) - { - return false; - } - - *setme = msgs->peerAskedFor[0]; - - tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->pendingReqsToClient); - --msgs->pendingReqsToClient; - - return true; -} - static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs) { - struct peer_request req; - bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io); - - while (popNextRequest(msgs, &req)) + if (auto const must_send_rej = tr_peerIoSupportsFEXT(msgs->io); must_send_rej) { - if (mustSendCancel) + for (auto& req : msgs->peer_requested_) { protocolSendReject(msgs, &req); } } + + msgs->peer_requested_.clear(); } /** @@ -1432,58 +1434,59 @@ static void prefetchPieces(tr_peerMsgsImpl* msgs) return; } - for (int i = msgs->prefetchCount; i < msgs->pendingReqsToClient && i < PrefetchSize; ++i) + // ensure that the first `PrefetchMax` items in `msgs->peer_requested_` are prefetched. + auto& requests = msgs->peer_requested_; + for (size_t i = 0, n = std::min(PrefetchMax, std::size(requests)); i < n; ++i) { - struct peer_request const* req = msgs->peerAskedFor + i; - - if (requestIsValid(msgs, req)) + if (auto& req = requests[i]; !req.prefetched) { tr_cachePrefetchBlock( msgs->session->cache, msgs->torrent, - msgs->torrent->pieceLoc(req->index, req->offset), - req->length); - ++msgs->prefetchCount; + msgs->torrent->pieceLoc(req.index, req.offset), + req.length); + req.prefetched = true; } } } -static void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req) +[[nodiscard]] static bool canAddRequestFromPeer(tr_peerMsgsImpl const* const msgs, struct peer_request const& req) { - bool const fext = tr_peerIoSupportsFEXT(msgs->io); - bool const reqIsValid = requestIsValid(msgs, req); - bool const clientHasPiece = reqIsValid && msgs->torrent->hasPiece(req->index); - bool const peerIsChoked = msgs->peer_is_choked_; - - bool allow = false; - - if (!reqIsValid) - { - logtrace(msgs, "rejecting an invalid request."); - } - else if (!clientHasPiece) - { - logtrace(msgs, "rejecting request for a piece we don't have."); - } - else if (peerIsChoked) + if (msgs->peer_is_choked_) { logtrace(msgs, "rejecting request from choked peer"); - } - else if (msgs->pendingReqsToClient + 1 >= ReqQ) - { - logtrace(msgs, "rejecting request ... reqq is full"); - } - else - { - allow = true; + return false; } - if (allow) + if (std::size(msgs->peer_requested_) >= ReqQ) { - msgs->peerAskedFor[msgs->pendingReqsToClient++] = *req; + logtrace(msgs, "rejecting request ... reqq is full"); + return false; + } + + if (!tr_torrentReqIsValid(msgs->torrent, req.index, req.offset, req.length)) + { + logtrace(msgs, "rejecting an invalid request."); + return false; + } + + if (!msgs->torrent->hasPiece(req.index)) + { + logtrace(msgs, "rejecting request for a piece we don't have."); + return false; + } + + return true; +} + +static void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req) +{ + if (canAddRequestFromPeer(msgs, *req)) + { + msgs->peer_requested_.emplace_back(*req); prefetchPieces(msgs); } - else if (fext) + else if (tr_peerIoSupportsFEXT(msgs->io)) { protocolSendReject(msgs, req); } @@ -1724,23 +1727,19 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si msgs->cancelsSentToClient.add(tr_time(), 1); logtrace(msgs, fmt::format(FMT_STRING("got a Cancel {:d}:{:d}->{:d}"), r.index, r.offset, r.length)); - for (int i = 0; i < msgs->pendingReqsToClient; ++i) + auto& requests = msgs->peer_requested_; + if (auto iter = std::find(std::begin(requests), std::end(requests), r); iter != std::end(requests)) { - struct peer_request const* req = msgs->peerAskedFor + i; + requests.erase(iter); - if (req->index == r.index && req->offset == r.offset && req->length == r.length) + // bep6: "Even when a request is cancelled, the peer + // receiving the cancel should respond with either the + // corresponding reject or the corresponding piece" + if (fext) { - tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request), msgs->pendingReqsToClient); - --msgs->pendingReqsToClient; - if (fext) - { - protocolSendReject(msgs, &r); - } - - break; + protocolSendReject(msgs, &r); } } - break; } @@ -2196,9 +2195,10 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) *** Data Blocks **/ - if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= tr_block_info::BlockSize && popNextRequest(msgs, &req)) + if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_)) { - --msgs->prefetchCount; + req = msgs->peer_requested_.front(); + msgs->peer_requested_.erase(std::begin(msgs->peer_requested_)); if (requestIsValid(msgs, &req) && msgs->torrent->hasPiece(req.index)) { diff --git a/libtransmission/peer-msgs.h b/libtransmission/peer-msgs.h index 2e7eb4f6d..d3fc4febc 100644 --- a/libtransmission/peer-msgs.h +++ b/libtransmission/peer-msgs.h @@ -35,6 +35,9 @@ public: virtual ~tr_peerMsgs() override = default; + /* how many requests the peer has made that we haven't responded to yet */ + [[nodiscard]] virtual size_t pendingReqsToClient() const noexcept = 0; + [[nodiscard]] virtual bool is_peer_choked() const noexcept = 0; [[nodiscard]] virtual bool is_peer_interested() const noexcept = 0; [[nodiscard]] virtual bool is_client_choked() const noexcept = 0;