perf: restore `3.00` wishlist with cached candidates (#6549)

* feat: rewrite Wishlist to cache candidates

* feat: implement mediator and observers

* feat: rewrite existing tests to work for current implementation

* fix: incorrect block spans for existing tests

* feat: add new tests for new features

* fix: clang shadow warning

* fix: heap-use-after-free in tests

* fixup! feat: rewrite Wishlist to cache candidates

* chore: update comment

* code review: reserve vector memory

---------

Co-authored-by: Charles Kerr <charles@charleskerr.com>
This commit is contained in:
Yat Ho 2024-02-05 14:14:34 +08:00 committed by GitHub
parent ca11c33d05
commit 168d56cefc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 1578 additions and 384 deletions

View File

@ -3,8 +3,9 @@
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <algorithm> // std::min, std::partial_sort
#include <algorithm> // std::adjacent_find
#include <cstddef>
#include <functional>
#include <utility>
#include <vector>
@ -14,136 +15,73 @@
#include "libtransmission/transmission.h"
#include "libtransmission/bitfield.h"
#include "libtransmission/crypto-utils.h" // for tr_salt_shaker
#include "libtransmission/peer-mgr-wishlist.h"
#include "libtransmission/utils.h"
namespace
{
using SaltType = tr_piece_index_t;
struct Candidate
std::vector<tr_block_span_t> make_spans(small::set<tr_block_index_t> const& blocks)
{
tr_piece_index_t piece;
size_t n_blocks_missing;
tr_priority_t priority;
SaltType salt;
Candidate(tr_piece_index_t piece_in, size_t missing_in, tr_priority_t priority_in, SaltType salt_in)
: piece{ piece_in }
, n_blocks_missing{ missing_in }
, priority{ priority_in }
, salt{ salt_in }
{
}
[[nodiscard]] constexpr auto compare(Candidate const& that) const noexcept // <=>
{
// prefer pieces closer to completion
if (auto const val = tr_compare_3way(n_blocks_missing, that.n_blocks_missing); val != 0)
{
return val;
}
// prefer higher priority
if (auto const val = tr_compare_3way(priority, that.priority); val != 0)
{
return -val;
}
return tr_compare_3way(salt, that.salt);
}
bool operator<(Candidate const& that) const // less than
{
return compare(that) < 0;
}
};
std::vector<Candidate> getCandidates(Wishlist::Mediator const& mediator)
{
// count up the pieces that we still want
auto wanted_pieces = std::vector<std::pair<tr_piece_index_t, size_t>>{};
auto const n_pieces = mediator.countAllPieces();
wanted_pieces.reserve(n_pieces);
for (tr_piece_index_t i = 0; i < n_pieces; ++i)
{
if (!mediator.clientCanRequestPiece(i))
{
continue;
}
size_t const n_missing = mediator.countMissingBlocks(i);
if (n_missing == 0)
{
continue;
}
wanted_pieces.emplace_back(i, n_missing);
}
// transform them into candidates
auto salter = tr_salt_shaker<SaltType>{};
auto const n = std::size(wanted_pieces);
auto candidates = std::vector<Candidate>{};
auto const is_sequential = mediator.isSequentialDownload();
candidates.reserve(n);
for (size_t i = 0; i < n; ++i)
{
auto const [piece, n_missing] = wanted_pieces[i];
auto const salt = is_sequential ? piece : salter();
candidates.emplace_back(piece, n_missing, mediator.priority(piece), salt);
}
return candidates;
}
std::vector<tr_block_span_t> makeSpans(tr_block_index_t const* sorted_blocks, size_t n_blocks)
{
if (n_blocks == 0)
if (std::empty(blocks))
{
return {};
}
auto spans = std::vector<tr_block_span_t>{};
auto cur = tr_block_span_t{ sorted_blocks[0], sorted_blocks[0] + 1 };
for (size_t i = 1; i < n_blocks; ++i)
spans.reserve(std::size(blocks));
for (auto span_begin = std::begin(blocks), end = std::end(blocks); span_begin != end;)
{
if (cur.end == sorted_blocks[i])
static auto constexpr NotAdjacent = [](tr_block_index_t const lhs, tr_block_index_t const rhs)
{
++cur.end;
}
else
return lhs + 1U != rhs;
};
auto span_end = std::adjacent_find(span_begin, end, NotAdjacent);
if (span_end == end)
{
spans.push_back(cur);
cur = tr_block_span_t{ sorted_blocks[i], sorted_blocks[i] + 1 };
--span_end;
}
spans.push_back({ *span_begin, *span_end + 1 });
span_begin = std::next(span_end);
}
spans.push_back(cur);
return spans;
}
} // namespace
std::vector<tr_block_span_t> Wishlist::next(size_t n_wanted_blocks)
Wishlist::Wishlist(std::unique_ptr<Mediator> mediator_in)
: tags_{ {
mediator_in->observe_peer_disconnect([this](tr_torrent*, tr_bitfield const& b) { dec_replication_from_bitfield(b); }),
mediator_in->observe_got_bitfield([this](tr_torrent*, tr_bitfield const& b) { inc_replication_from_bitfield(b); }),
mediator_in->observe_got_block([this](tr_torrent*, tr_piece_index_t p, tr_block_index_t) { resort_piece(p); }),
mediator_in->observe_got_have([this](tr_torrent*, tr_piece_index_t p) { inc_replication_piece(p); }),
mediator_in->observe_got_have_all([this](tr_torrent*) { inc_replication(); }),
mediator_in->observe_piece_completed([this](tr_torrent*, tr_piece_index_t p) { remove_piece(p); }),
mediator_in->observe_priority_changed([this](tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t)
{ set_candidates_dirty(); }),
mediator_in->observe_sequential_download_changed([this](tr_torrent*, bool) { set_candidates_dirty(); }),
} }
, mediator_{ std::move(mediator_in) }
{
if (n_wanted_blocks == 0)
}
std::vector<tr_block_span_t> Wishlist::next(
size_t n_wanted_blocks,
std::function<bool(tr_piece_index_t)> const& peer_has_piece,
std::function<bool(tr_block_index_t)> const& has_active_pending_to_peer)
{
if (n_wanted_blocks == 0U)
{
return {};
}
auto candidates = getCandidates(mediator_);
maybe_rebuild_candidate_list();
// We usually won't need all the candidates to be sorted until endgame, so don't
// waste cycles sorting all of them here. partial sort is enough.
static auto constexpr MaxSortedPieces = size_t{ 30U };
auto const middle = std::min(std::size(candidates), MaxSortedPieces);
std::partial_sort(std::begin(candidates), std::begin(candidates) + middle, std::end(candidates));
auto blocks = small::set<tr_block_index_t, 4096U>{};
auto blocks = small::set<tr_block_index_t>{};
blocks.reserve(n_wanted_blocks);
for (auto const& candidate : candidates)
for (auto const& candidate : candidates_)
{
// do we have enough?
if (std::size(blocks) >= n_wanted_blocks)
@ -151,19 +89,27 @@ std::vector<tr_block_span_t> Wishlist::next(size_t n_wanted_blocks)
break;
}
// walk the blocks in this piece
auto const [begin, end] = mediator_.blockSpan(candidate.piece);
for (tr_block_index_t block = begin; block < end && std::size(blocks) < n_wanted_blocks; ++block)
// if the peer doesn't have this piece that we want...
if (!peer_has_piece(candidate.piece))
{
// don't request blocks we've already got
if (!mediator_.clientCanRequestBlock(block))
continue;
}
// walk the blocks in this piece
for (auto [block, end] = mediator_->block_span(candidate.piece); block < end && std::size(blocks) < n_wanted_blocks;
++block)
{
// don't request blocks that:
// 1. we've already got, or
// 2. already has an active request to that peer
if (mediator_->client_has_block(block) || has_active_pending_to_peer(block))
{
continue;
}
// don't request from too many peers
size_t const n_peers = mediator_.countActiveRequests(block);
if (size_t const max_peers = mediator_.isEndgame() ? 2 : 1; n_peers >= max_peers)
auto const n_peers = mediator_->count_active_requests(block);
if (auto const max_peers = mediator_->is_endgame() ? EndgameMaxPeers : NormalMaxPeers; n_peers >= max_peers)
{
continue;
}
@ -172,6 +118,210 @@ std::vector<tr_block_span_t> Wishlist::next(size_t n_wanted_blocks)
}
}
auto const blocks_v = std::vector<tr_block_index_t>{ std::begin(blocks), std::end(blocks) };
return makeSpans(std::data(blocks_v), std::size(blocks_v));
return make_spans(blocks);
}
void Wishlist::maybe_rebuild_candidate_list()
{
if (!candidates_dirty_)
{
return;
}
candidates_dirty_ = false;
candidates_.clear();
auto salter = tr_salt_shaker<tr_piece_index_t>{};
auto const is_sequential = mediator_->is_sequential_download();
auto const n_pieces = mediator_->piece_count();
candidates_.reserve(n_pieces);
for (tr_piece_index_t piece = 0U; piece < n_pieces; ++piece)
{
if (mediator_->count_missing_blocks(piece) <= 0U || !mediator_->client_wants_piece(piece))
{
continue;
}
auto const salt = is_sequential ? piece : salter();
candidates_
.emplace_back(piece, mediator_->count_piece_replication(piece), mediator_->priority(piece), salt, mediator_.get());
}
std::sort(std::begin(candidates_), std::end(candidates_));
}
Wishlist::CandidateVec::iterator Wishlist::piece_lookup(tr_piece_index_t const piece)
{
return std::find_if(
std::begin(candidates_),
std::end(candidates_),
[piece](auto const& candidate) { return candidate.piece == piece; });
}
void Wishlist::dec_replication()
{
if (!candidates_dirty_)
{
std::for_each(
std::begin(candidates_),
std::end(candidates_),
[](Candidate& candidate)
{
TR_ASSERT(candidate.replication > 0U);
--candidate.replication;
});
}
}
void Wishlist::dec_replication_from_bitfield(tr_bitfield const& bitfield)
{
if (candidates_dirty_)
{
return;
}
if (bitfield.has_none())
{
return;
}
if (bitfield.has_all())
{
dec_replication();
return;
}
for (auto& candidate : candidates_)
{
if (bitfield.test(candidate.piece))
{
TR_ASSERT(candidate.replication > 0U);
--candidate.replication;
}
}
std::sort(std::begin(candidates_), std::end(candidates_));
}
void Wishlist::inc_replication()
{
if (!candidates_dirty_)
{
std::for_each(std::begin(candidates_), std::end(candidates_), [](Candidate& candidate) { ++candidate.replication; });
}
}
void Wishlist::inc_replication_from_bitfield(tr_bitfield const& bitfield)
{
if (candidates_dirty_)
{
return;
}
if (bitfield.has_none())
{
return;
}
if (bitfield.has_all())
{
inc_replication();
return;
}
for (auto& candidate : candidates_)
{
if (bitfield.test(candidate.piece))
{
++candidate.replication;
}
}
std::sort(std::begin(candidates_), std::end(candidates_));
}
void Wishlist::inc_replication_piece(tr_piece_index_t piece)
{
if (candidates_dirty_)
{
return;
}
if (auto iter = piece_lookup(piece); iter != std::end(candidates_))
{
++iter->replication;
resort_piece(iter);
}
}
void Wishlist::resort_piece(tr_piece_index_t const piece)
{
if (candidates_dirty_)
{
return;
}
if (auto iter = piece_lookup(piece); iter != std::end(candidates_))
{
resort_piece(iter);
}
}
void Wishlist::resort_piece(CandidateVec::iterator const pos_old)
{
if (candidates_dirty_)
{
return;
}
TR_ASSERT(pos_old != std::end(candidates_));
if (auto const pos_next = std::next(pos_old); std::is_sorted(
pos_old == std::begin(candidates_) ? pos_old : std::prev(pos_old),
pos_next == std::end(candidates_) ? pos_next : std::next(pos_next)))
{
return;
}
auto const tmp = *pos_old;
candidates_.erase(pos_old);
auto const pos_new = std::lower_bound(std::begin(candidates_), std::end(candidates_), tmp);
candidates_.insert(pos_new, tmp);
}
void Wishlist::remove_piece(tr_piece_index_t const piece)
{
if (candidates_dirty_)
{
return;
}
if (auto iter = piece_lookup(piece); iter != std::end(candidates_))
{
candidates_.erase(iter);
}
}
// ---
int Wishlist::Candidate::compare(Wishlist::Candidate const& that) const noexcept
{
// prefer pieces closer to completion
if (auto const val = tr_compare_3way(mediator_->count_missing_blocks(piece), mediator_->count_missing_blocks(that.piece));
val != 0)
{
return val;
}
// prefer higher priority
if (auto const val = tr_compare_3way(priority, that.priority); val != 0)
{
return -val;
}
// prefer rarer pieces
if (auto const val = tr_compare_3way(replication, that.replication); val != 0)
{
return val;
}
return tr_compare_3way(salt, that.salt);
}

View File

@ -10,10 +10,17 @@
#endif
#include <cstddef> // size_t
#include <functional>
#include <memory>
#include <vector>
#include "libtransmission/transmission.h"
#include "libtransmission/observable.h"
#include "libtransmission/utils.h"
class tr_bitfield;
/**
* Figures out what blocks we want to request next.
*/
@ -21,29 +28,117 @@ class Wishlist
{
public:
static auto constexpr EndgameMaxPeers = size_t{ 2U };
static auto constexpr NormalMaxPeers = size_t{ 1U };
struct Mediator
{
[[nodiscard]] virtual bool clientCanRequestBlock(tr_block_index_t block) const = 0;
[[nodiscard]] virtual bool clientCanRequestPiece(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual bool isEndgame() const = 0;
[[nodiscard]] virtual bool isSequentialDownload() const = 0;
[[nodiscard]] virtual size_t countActiveRequests(tr_block_index_t block) const = 0;
[[nodiscard]] virtual size_t countMissingBlocks(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual tr_block_span_t blockSpan(tr_piece_index_t) const = 0;
[[nodiscard]] virtual tr_piece_index_t countAllPieces() const = 0;
[[nodiscard]] virtual tr_priority_t priority(tr_piece_index_t) const = 0;
[[nodiscard]] virtual bool client_has_block(tr_block_index_t block) const = 0;
[[nodiscard]] virtual bool client_wants_piece(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual bool is_endgame() const = 0;
[[nodiscard]] virtual bool is_sequential_download() const = 0;
[[nodiscard]] virtual size_t count_active_requests(tr_block_index_t block) const = 0;
[[nodiscard]] virtual size_t count_missing_blocks(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual size_t count_piece_replication(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual tr_block_span_t block_span(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual tr_piece_index_t piece_count() const = 0;
[[nodiscard]] virtual tr_priority_t priority(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_peer_disconnect(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_bitfield(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_block(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_have(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_have_all(
libtransmission::SimpleObservable<tr_torrent*>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_piece_completed(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_priority_changed(
libtransmission::SimpleObservable<tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t>::Observer
observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_sequential_download_changed(
libtransmission::SimpleObservable<tr_torrent*, bool>::Observer observer) = 0;
virtual ~Mediator() = default;
};
constexpr explicit Wishlist(Mediator const& mediator)
: mediator_{ mediator }
private:
struct Candidate
{
Candidate(
tr_piece_index_t piece_in,
size_t replication_in,
tr_priority_t priority_in,
tr_piece_index_t salt_in,
Mediator const* mediator)
: piece{ piece_in }
, replication{ replication_in }
, priority{ priority_in }
, salt{ salt_in }
, mediator_{ mediator }
{
}
[[nodiscard]] int compare(Candidate const& that) const noexcept; // <=>
[[nodiscard]] auto operator<(Candidate const& that) const // less than
{
return compare(that) < 0;
}
tr_piece_index_t piece;
// Caching the following 2 values are highly beneficial, because:
// - they are often used (mainly because resort_piece() is called
// every time we receive a block)
// - does not change as often compared to missing blocks
// - calculating their values involves sifting through bitfield(s),
// which is expensive.
size_t replication;
tr_priority_t priority;
tr_piece_index_t salt;
private:
Mediator const* mediator_;
};
public:
explicit Wishlist(std::unique_ptr<Mediator> mediator_in);
constexpr void set_candidates_dirty() noexcept
{
candidates_dirty_ = true;
}
// the next blocks that we should request from a peer
[[nodiscard]] std::vector<tr_block_span_t> next(size_t n_wanted_blocks);
[[nodiscard]] std::vector<tr_block_span_t> next(
size_t n_wanted_blocks,
std::function<bool(tr_piece_index_t)> const& peer_has_piece,
std::function<bool(tr_block_index_t)> const& has_active_pending_to_peer);
void dec_replication();
void dec_replication_from_bitfield(tr_bitfield const& bitfield);
void inc_replication();
void inc_replication_from_bitfield(tr_bitfield const& bitfield);
void inc_replication_piece(tr_piece_index_t piece);
void remove_piece(tr_piece_index_t piece);
void resort_piece(tr_piece_index_t piece);
private:
Mediator const& mediator_;
using CandidateVec = std::vector<Candidate>;
CandidateVec::iterator piece_lookup(tr_piece_index_t piece);
void maybe_rebuild_candidate_list();
void resort_piece(CandidateVec::iterator pos_old);
CandidateVec candidates_;
bool candidates_dirty_ = true;
std::array<libtransmission::ObserverTag, 8U> const tags_;
std::unique_ptr<Mediator> const mediator_;
};

View File

@ -285,6 +285,49 @@ public:
using Peers = std::vector<tr_peerMsgs*>;
using Pool = std::unordered_map<tr_socket_address, tr_peer_info>;
class WishlistMediator final : public Wishlist::Mediator
{
public:
explicit WishlistMediator(tr_swarm& swarm)
: tor_{ *swarm.tor }
, swarm_{ swarm }
{
}
[[nodiscard]] bool client_has_block(tr_block_index_t block) const override;
[[nodiscard]] bool client_wants_piece(tr_piece_index_t piece) const override;
[[nodiscard]] bool is_endgame() const override;
[[nodiscard]] bool is_sequential_download() const override;
[[nodiscard]] size_t count_active_requests(tr_block_index_t block) const override;
[[nodiscard]] size_t count_missing_blocks(tr_piece_index_t piece) const override;
[[nodiscard]] size_t count_piece_replication(tr_piece_index_t piece) const override;
[[nodiscard]] tr_block_span_t block_span(tr_piece_index_t piece) const override;
[[nodiscard]] tr_piece_index_t piece_count() const override;
[[nodiscard]] tr_priority_t priority(tr_piece_index_t piece) const override;
[[nodiscard]] libtransmission::ObserverTag observe_peer_disconnect(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_bitfield(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_block(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_have(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_have_all(
libtransmission::SimpleObservable<tr_torrent*>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_piece_completed(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_priority_changed(
libtransmission::SimpleObservable<tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t>::Observer
observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_sequential_download_changed(
libtransmission::SimpleObservable<tr_torrent*, bool>::Observer observer) override;
private:
tr_torrent& tor_;
tr_swarm& swarm_;
};
[[nodiscard]] auto unique_lock() const
{
return tor->unique_lock();
@ -372,6 +415,8 @@ public:
{
auto const lock = unique_lock();
peer_disconnect.emit(tor, peer->has());
auto* const peer_info = peer->peer_info;
auto const socket_address = peer->socket_address();
[[maybe_unused]] auto const is_incoming = peer->is_incoming_connection();
@ -504,11 +549,19 @@ public:
break;
case tr_peer_event::Type::ClientGotHave:
s->got_have.emit(s->tor, event.pieceIndex);
break;
case tr_peer_event::Type::ClientGotHaveAll:
s->got_have_all.emit(s->tor);
break;
case tr_peer_event::Type::ClientGotHaveNone:
// no-op
break;
case tr_peer_event::Type::ClientGotBitfield:
/* TODO: if we don't need these, should these events be removed? */
/* noop */
s->got_bitfield.emit(s->tor, msgs->has());
break;
case tr_peer_event::Type::ClientGotChoke:
@ -544,6 +597,12 @@ public:
}
}
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&> peer_disconnect;
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&> got_bitfield;
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t> got_block;
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t> got_have;
libtransmission::SimpleObservable<tr_torrent*> got_have_all;
mutable tr_swarm_stats stats = {};
uint8_t optimistic_unchoke_time_scaler = 0;
@ -562,6 +621,8 @@ public:
// depends-on: active_requests
Peers peers;
std::unique_ptr<Wishlist> wishlist;
// tr_peerMsgs hold pointers to the items in these containers,
// therefore references to elements within cannot invalidate
Pool incoming_pool;
@ -652,6 +713,7 @@ private:
void on_torrent_done()
{
std::for_each(std::begin(peers), std::end(peers), [](auto* const peer) { peer->set_interested(false); });
wishlist.reset();
}
void on_swarm_is_all_seeds()
@ -765,6 +827,7 @@ private:
s->cancel_all_requests_for_block(loc.block, peer);
peer->blocks_sent_to_client.add(tr_time(), 1);
tor->on_block_received(loc.block);
s->got_block.emit(tor, event.pieceIndex, loc.block);
}
break;
@ -907,6 +970,110 @@ private:
bool is_endgame_ = false;
};
bool tr_swarm::WishlistMediator::client_has_block(tr_block_index_t block) const
{
return tor_.has_block(block);
}
bool tr_swarm::WishlistMediator::client_wants_piece(tr_piece_index_t piece) const
{
return tor_.piece_is_wanted(piece);
}
bool tr_swarm::WishlistMediator::is_endgame() const
{
return swarm_.is_endgame();
}
bool tr_swarm::WishlistMediator::is_sequential_download() const
{
return tor_.is_sequential_download();
}
size_t tr_swarm::WishlistMediator::count_active_requests(tr_block_index_t block) const
{
return swarm_.active_requests.count(block);
}
size_t tr_swarm::WishlistMediator::count_missing_blocks(tr_piece_index_t piece) const
{
return tor_.count_missing_blocks_in_piece(piece);
}
size_t tr_swarm::WishlistMediator::count_piece_replication(tr_piece_index_t piece) const
{
return std::accumulate(
std::begin(swarm_.peers),
std::end(swarm_.peers),
size_t{},
[piece](size_t acc, tr_peer* peer) { return acc + (peer->hasPiece(piece) ? 1U : 0U); });
}
tr_block_span_t tr_swarm::WishlistMediator::block_span(tr_piece_index_t piece) const
{
return tor_.block_span_for_piece(piece);
}
tr_piece_index_t tr_swarm::WishlistMediator::piece_count() const
{
return tor_.piece_count();
}
tr_priority_t tr_swarm::WishlistMediator::priority(tr_piece_index_t piece) const
{
return tor_.piece_priority(piece);
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_peer_disconnect(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer)
{
return swarm_.peer_disconnect.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_bitfield(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer)
{
return swarm_.got_bitfield.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_block(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t>::Observer observer)
{
return swarm_.got_block.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_have(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer)
{
return swarm_.got_have.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_have_all(
libtransmission::SimpleObservable<tr_torrent*>::Observer observer)
{
return swarm_.got_have_all.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_piece_completed(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer)
{
return tor_.piece_completed_.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_priority_changed(
libtransmission::SimpleObservable<tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t>::Observer observer)
{
return tor_.priority_changed_.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_sequential_download_changed(
libtransmission::SimpleObservable<tr_torrent*, bool>::Observer observer)
{
return tor_.sequential_download_changed_.observe(std::move(observer));
}
// ---
struct tr_peerMgr
{
private:
@ -1067,9 +1234,9 @@ void tr_peerMgrFree(tr_peerMgr* manager)
* This is used for cancelling requests that have been waiting
* for too long and avoiding duplicate requests.
*
* 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
* pieces that we want to request. It's used to decide which blocks to
* return next when tr_peerMgrGetBlockRequests() is called.
* 2. tr_swarm::wishlist, a class that tracks the pieces that we want to
* request. It's used to decide which blocks to return next when
* tr_peerMgrGetNextRequests() is called.
*/
// --- struct block_request
@ -1087,77 +1254,16 @@ void tr_peerMgrClientSentRequests(tr_torrent* torrent, tr_peer* peer, tr_block_s
std::vector<tr_block_span_t> tr_peerMgrGetNextRequests(tr_torrent* torrent, tr_peer const* peer, size_t numwant)
{
class MediatorImpl final : public Wishlist::Mediator
TR_ASSERT(!torrent->is_done());
tr_swarm& swarm = *torrent->swarm;
if (!swarm.wishlist)
{
public:
MediatorImpl(tr_torrent const* torrent_in, tr_peer const* peer_in)
: torrent_{ torrent_in }
, swarm_{ torrent_in->swarm }
, peer_{ peer_in }
{
}
MediatorImpl(MediatorImpl&&) = delete;
MediatorImpl(MediatorImpl const&) = delete;
MediatorImpl& operator=(MediatorImpl&&) = delete;
MediatorImpl& operator=(MediatorImpl const&) = delete;
~MediatorImpl() override = default;
[[nodiscard]] bool clientCanRequestBlock(tr_block_index_t block) const override
{
return !torrent_->has_block(block) && !swarm_->active_requests.has(block, peer_);
}
[[nodiscard]] bool clientCanRequestPiece(tr_piece_index_t piece) const override
{
return torrent_->piece_is_wanted(piece) && peer_->hasPiece(piece);
}
[[nodiscard]] bool isEndgame() const override
{
return swarm_->is_endgame();
}
[[nodiscard]] size_t countActiveRequests(tr_block_index_t block) const override
{
return swarm_->active_requests.count(block);
}
[[nodiscard]] size_t countMissingBlocks(tr_piece_index_t piece) const override
{
return torrent_->count_missing_blocks_in_piece(piece);
}
[[nodiscard]] tr_block_span_t blockSpan(tr_piece_index_t piece) const override
{
return torrent_->block_span_for_piece(piece);
}
[[nodiscard]] tr_piece_index_t countAllPieces() const override
{
return torrent_->piece_count();
}
[[nodiscard]] tr_priority_t priority(tr_piece_index_t piece) const override
{
return torrent_->piece_priority(piece);
}
[[nodiscard]] bool isSequentialDownload() const override
{
return torrent_->is_sequential_download();
}
private:
tr_torrent const* const torrent_;
tr_swarm const* const swarm_;
tr_peer const* const peer_;
};
torrent->swarm->update_endgame();
auto const mediator = MediatorImpl{ torrent, peer };
return Wishlist{ mediator }.next(numwant);
swarm.wishlist = std::make_unique<Wishlist>(std::make_unique<tr_swarm::WishlistMediator>(swarm));
}
return swarm.wishlist->next(
numwant,
[peer](tr_piece_index_t p) { return peer->hasPiece(p); },
[peer, &swarm](tr_block_index_t b) { return swarm.active_requests.has(b, peer); });
}
// --- Piece List Manipulation / Accessors

View File

@ -1945,6 +1945,21 @@ tr_block_span_t tr_torrent::block_span_for_file(tr_file_index_t const file) cons
// ---
void tr_torrent::set_file_priorities(tr_file_index_t const* files, tr_file_index_t file_count, tr_priority_t priority)
{
if (std::any_of(
files,
files + file_count,
[this, priority](tr_file_index_t file) { return priority != file_priorities_.file_priority(file); }))
{
file_priorities_.set(files, file_count, priority);
priority_changed_.emit(this, files, file_count, priority);
set_dirty();
}
}
// ---
bool tr_torrent::check_piece(tr_piece_index_t const piece) const
{
auto const pass = tr_ioTestPiece(*this, piece);

View File

@ -421,16 +421,16 @@ struct tr_torrent final : public tr_completion::torrent_view
return file_priorities_.piece_priority(piece);
}
void set_file_priorities(tr_file_index_t const* files, tr_file_index_t file_count, tr_priority_t priority)
{
file_priorities_.set(files, file_count, priority);
set_dirty();
}
void set_file_priorities(tr_file_index_t const* files, tr_file_index_t file_count, tr_priority_t priority);
void set_file_priority(tr_file_index_t file, tr_priority_t priority)
{
file_priorities_.set(file, priority);
set_dirty();
if (priority != file_priorities_.file_priority(file))
{
file_priorities_.set(file, priority);
priority_changed_.emit(this, &file, 1U, priority);
set_dirty();
}
}
/// LOCATION
@ -728,9 +728,13 @@ struct tr_torrent final : public tr_completion::torrent_view
torrent's content than any other mime-type. */
[[nodiscard]] std::string_view primary_mime_type() const;
constexpr void set_sequential_download(bool is_sequential) noexcept
void set_sequential_download(bool is_sequential) noexcept
{
sequential_download_ = is_sequential;
if (is_sequential != sequential_download_)
{
sequential_download_ = is_sequential;
sequential_download_changed_.emit(this, is_sequential);
}
}
[[nodiscard]] constexpr auto is_sequential_download() const noexcept
@ -955,6 +959,8 @@ struct tr_torrent final : public tr_completion::torrent_view
libtransmission::SimpleObservable<tr_torrent*> started_;
libtransmission::SimpleObservable<tr_torrent*> stopped_;
libtransmission::SimpleObservable<tr_torrent*> swarm_is_all_seeds_;
libtransmission::SimpleObservable<tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t> priority_changed_;
libtransmission::SimpleObservable<tr_torrent*, bool> sequential_download_changed_;
CumulativeCount bytes_corrupt_;
CumulativeCount bytes_downloaded_;

File diff suppressed because it is too large Load Diff