1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2024-12-21 15:22:37 +00:00

refactor: faster wishlist (#7027)

* chore: housekeeping

* perf: short circuit peer has block check

* refactor: track active requests in each respective peer

* refactor: swap `ActiveRequests` with new request tracking method

* refactor: use bitfield to store active requests per peer

* perf: check active request numbers first

* refactor: initialise candidate values in constructor

* refactor: better naming

* refactor: use `find_by_block()` more

* refactor: store wishlist mediator in swarm object

* test: make it compile

* test: update endgame test

* test: new test for choke event

* test: remove redundant lines

* test: new test for request event

* test: new test for reject event

* refactor: cache block have state in wishlist

* test: fix `gotBlockResortsPiece`

* fixup! refactor: track active requests in each respective peer

* fixup! test: fix `gotBlockResortsPiece`

* fix: count webseeds when calculating active requests

* build: update xcode project

* fix: add missing `candidates_dirty_` checks

* chore: remove old `depends-on` comments

* fixup! refactor: use bitfield to store active requests per peer

* refactor: extract block peer event to separate function

* perf: reorder conditions by overhead

* perf: check for completed block instead of completed piece

* chore: remove duplicated "unrequested piece" check

* refactor: merge similar block size sanity check

* refactor: use map to store number of requests in wishlist

* refactor: add asserts

* refactor: flush write buffer as soon as there is new data

* refactor: more accurate function naming

* fix: account for corrupt pieces in wishlist

* fix: account for unaligned blocks in wishlist

* Revert "fix: account for unaligned blocks in wishlist"

This reverts commit c3fce93cbae49c11d62e26caccedf55c1987aa95.

* fixup! refactor: use map to store number of requests in wishlist

* fix: account for unaligned blocks in wishlist v2

* chore: add `[[nodiscard]]`

* fixup! fix: account for unaligned blocks in wishlist v2

* fix: crash when handshake finishes in the middle of function
This commit is contained in:
Yat Ho 2024-11-12 09:30:00 +08:00 committed by GitHub
parent 198ee5bd97
commit 7e4b4f10a1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 1023 additions and 931 deletions

View file

@ -451,8 +451,6 @@
ED67FB422B70FCE400D8A037 /* settings.cc in Sources */ = {isa = PBXBuildFile; fileRef = ED67FB402B70FCE400D8A037 /* settings.cc */; };
ED67FB432B70FCE400D8A037 /* settings.h in Headers */ = {isa = PBXBuildFile; fileRef = ED67FB412B70FCE400D8A037 /* settings.h */; };
ED86936F2ADAE34D00342B1A /* DefaultAppHelper.mm in Sources */ = {isa = PBXBuildFile; fileRef = ED86936E2ADAE34D00342B1A /* DefaultAppHelper.mm */; };
ED8A163F2735A8AA000D61F9 /* peer-mgr-active-requests.h in Headers */ = {isa = PBXBuildFile; fileRef = ED8A163B2735A8AA000D61F9 /* peer-mgr-active-requests.h */; };
ED8A16402735A8AA000D61F9 /* peer-mgr-active-requests.cc in Sources */ = {isa = PBXBuildFile; fileRef = ED8A163C2735A8AA000D61F9 /* peer-mgr-active-requests.cc */; };
ED8A16412735A8AA000D61F9 /* peer-mgr-wishlist.h in Headers */ = {isa = PBXBuildFile; fileRef = ED8A163D2735A8AA000D61F9 /* peer-mgr-wishlist.h */; };
ED8A16422735A8AA000D61F9 /* peer-mgr-wishlist.cc in Sources */ = {isa = PBXBuildFile; fileRef = ED8A163E2735A8AA000D61F9 /* peer-mgr-wishlist.cc */; };
ED9862972B979AA2002F3035 /* Utils.mm in Sources */ = {isa = PBXBuildFile; fileRef = ED9862962B979AA2002F3035 /* Utils.mm */; };
@ -1370,8 +1368,6 @@
ED67FB412B70FCE400D8A037 /* settings.h */ = {isa = PBXFileReference; explicitFileType = sourcecode.cpp.h; fileEncoding = 4; path = settings.h; sourceTree = "<group>"; };
ED86936D2ADAE34D00342B1A /* DefaultAppHelper.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DefaultAppHelper.h; sourceTree = "<group>"; };
ED86936E2ADAE34D00342B1A /* DefaultAppHelper.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = DefaultAppHelper.mm; sourceTree = "<group>"; };
ED8A163B2735A8AA000D61F9 /* peer-mgr-active-requests.h */ = {isa = PBXFileReference; explicitFileType = sourcecode.cpp.h; fileEncoding = 4; path = "peer-mgr-active-requests.h"; sourceTree = "<group>"; };
ED8A163C2735A8AA000D61F9 /* peer-mgr-active-requests.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = "peer-mgr-active-requests.cc"; sourceTree = "<group>"; };
ED8A163D2735A8AA000D61F9 /* peer-mgr-wishlist.h */ = {isa = PBXFileReference; explicitFileType = sourcecode.cpp.h; fileEncoding = 4; path = "peer-mgr-wishlist.h"; sourceTree = "<group>"; };
ED8A163E2735A8AA000D61F9 /* peer-mgr-wishlist.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = "peer-mgr-wishlist.cc"; sourceTree = "<group>"; };
ED9862952B979AA2002F3035 /* Utils.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Utils.h; sourceTree = "<group>"; };
@ -1862,8 +1858,6 @@
A292A6E40DFB45E5004B9C0A /* peer-common.h */,
4D36BA650CA2F00800A63CA5 /* peer-io.cc */,
4D36BA660CA2F00800A63CA5 /* peer-io.h */,
ED8A163C2735A8AA000D61F9 /* peer-mgr-active-requests.cc */,
ED8A163B2735A8AA000D61F9 /* peer-mgr-active-requests.h */,
ED8A163E2735A8AA000D61F9 /* peer-mgr-wishlist.cc */,
ED8A163D2735A8AA000D61F9 /* peer-mgr-wishlist.h */,
4D36BA680CA2F00800A63CA5 /* peer-mgr.cc */,
@ -2402,7 +2396,6 @@
CCEBA596277340F6DF9F4482 /* session-alt-speeds.h in Headers */,
BEFC1E4E0C07861A00B0BB3C /* inout.h in Headers */,
BEFC1E520C07861A00B0BB3C /* open-files.h in Headers */,
ED8A163F2735A8AA000D61F9 /* peer-mgr-active-requests.h in Headers */,
BEFC1E550C07861A00B0BB3C /* completion.h in Headers */,
BEFC1E570C07861A00B0BB3C /* clients.h in Headers */,
A2BE9C530C1E4AF7002D16E6 /* makemeta.h in Headers */,
@ -3161,7 +3154,6 @@
EDBAAC8E29E486C200D9495F /* ip-cache.cc in Sources */,
BEFC1E2D0C07861A00B0BB3C /* port-forwarding-upnp.cc in Sources */,
A2AAB65C0DE0CF6200E04DDA /* rpc-server.cc in Sources */,
ED8A16402735A8AA000D61F9 /* peer-mgr-active-requests.cc in Sources */,
BEFC1E2F0C07861A00B0BB3C /* session.cc in Sources */,
CCEBA596277340F6DF9F4480 /* session-alt-speeds.cc in Sources */,
BEFC1E320C07861A00B0BB3C /* torrent.cc in Sources */,

View file

@ -82,8 +82,6 @@ target_sources(${TR_NAME}
peer-common.h
peer-io.cc
peer-io.h
peer-mgr-active-requests.cc
peer-mgr-active-requests.h
peer-mgr-wishlist.cc
peer-mgr-wishlist.h
peer-mgr.cc

View file

@ -34,6 +34,16 @@ struct tr_peer;
class tr_peer_event
{
[[nodiscard]] constexpr static auto BlockEvent(tr_block_info const& block_info, tr_block_index_t block) noexcept
{
auto const loc = block_info.block_loc(block);
auto event = tr_peer_event{};
event.pieceIndex = loc.piece;
event.offset = loc.piece_offset;
event.length = block_info.block_size(block);
return event;
}
public:
enum class Type
{
@ -49,7 +59,9 @@ public:
ClientGotHave,
ClientGotHaveAll,
ClientGotHaveNone,
ClientSentCancel,
ClientSentPieceData,
ClientSentRequest,
Error // generic
};
@ -64,12 +76,8 @@ public:
[[nodiscard]] constexpr static auto GotBlock(tr_block_info const& block_info, tr_block_index_t block) noexcept
{
auto const loc = block_info.block_loc(block);
auto event = tr_peer_event{};
auto event = BlockEvent(block_info, block);
event.type = Type::ClientGotBlock;
event.pieceIndex = loc.piece;
event.offset = loc.piece_offset;
event.length = block_info.block_size(block);
return event;
}
@ -144,12 +152,8 @@ public:
[[nodiscard]] constexpr static auto GotRejected(tr_block_info const& block_info, tr_block_index_t block) noexcept
{
auto const loc = block_info.block_loc(block);
auto event = tr_peer_event{};
auto event = BlockEvent(block_info, block);
event.type = Type::ClientGotRej;
event.pieceIndex = loc.piece;
event.offset = loc.piece_offset;
event.length = block_info.block_size(block);
return event;
}
@ -161,6 +165,13 @@ public:
return event;
}
[[nodiscard]] constexpr static auto SentCancel(tr_block_info const& block_info, tr_block_index_t block) noexcept
{
auto event = BlockEvent(block_info, block);
event.type = Type::ClientSentCancel;
return event;
}
[[nodiscard]] constexpr static auto SentPieceData(uint32_t length) noexcept
{
auto event = tr_peer_event{};
@ -168,6 +179,18 @@ public:
event.length = length;
return event;
}
[[nodiscard]] constexpr static auto SentRequest(tr_block_info const& block_info, tr_block_span_t block_span) noexcept
{
auto const loc_begin = block_info.block_loc(block_span.begin);
auto const loc_end = block_info.block_loc(block_span.end);
auto event = tr_peer_event{};
event.type = Type::ClientSentRequest;
event.pieceIndex = loc_begin.piece;
event.offset = loc_begin.piece_offset;
event.length = loc_end.byte - loc_begin.byte;
return event;
}
};
using tr_peer_callback_generic = void (*)(tr_peer* peer, tr_peer_event const& event, void* client_data);
@ -183,7 +206,7 @@ struct tr_peer
using Speed = libtransmission::Values::Speed;
explicit tr_peer(tr_torrent const& tor);
virtual ~tr_peer();
virtual ~tr_peer() = default;
[[nodiscard]] virtual Speed get_piece_speed(uint64_t now, tr_direction direction) const = 0;
@ -211,7 +234,7 @@ struct tr_peer
virtual void request_blocks(tr_block_span_t const* block_spans, size_t n_spans) = 0;
virtual void cancel_block_request(tr_block_index_t /*block*/)
virtual void maybe_cancel_block_request(tr_block_index_t /*block*/)
{
}
@ -225,6 +248,8 @@ struct tr_peer
tr_recentHistory<uint16_t> cancels_sent_to_client;
tr_bitfield active_requests;
/// The following fields are only to be used in peer-mgr.cc.
/// TODO(ckerr): refactor them out of `tr_peer`

View file

@ -591,6 +591,24 @@ size_t tr_peerIo::flush_outgoing_protocol_msgs()
return flush(TR_UP, byte_count);
}
void tr_peerIo::write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data)
{
outbuf_info_.emplace_back(n_bytes, is_piece_data);
auto [resbuf, reslen] = outbuf_.reserve_space(n_bytes);
filter_.encrypt(reinterpret_cast<std::byte const*>(bytes), n_bytes, resbuf);
outbuf_.commit_space(n_bytes);
session_->queue_session_thread(
[ptr = std::weak_ptr{ shared_from_this() }]()
{
if (auto io = ptr.lock(); io)
{
io->try_write(SIZE_MAX);
}
});
}
// ---
size_t tr_peerIo::get_write_buffer_space(uint64_t now) const noexcept

View file

@ -142,14 +142,7 @@ public:
[[nodiscard]] size_t get_write_buffer_space(uint64_t now) const noexcept;
void write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data)
{
outbuf_info_.emplace_back(n_bytes, is_piece_data);
auto [resbuf, reslen] = outbuf_.reserve_space(n_bytes);
filter_.encrypt(reinterpret_cast<std::byte const*>(bytes), n_bytes, resbuf);
outbuf_.commit_space(n_bytes);
}
void write_bytes(void const* bytes, size_t n_bytes, bool is_piece_data);
// Write all the data from `buf`.
// This is a destructive add: `buf` is empty after this call.

View file

@ -1,201 +0,0 @@
// This file Copyright © Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <algorithm>
#include <cstddef> // size_t
#include <ctime>
#include <memory>
#include <utility>
#include <unordered_map>
#include <vector>
#include <small/map.hpp>
#define LIBTRANSMISSION_PEER_MODULE
#include "libtransmission/transmission.h"
#include "libtransmission/peer-mgr-active-requests.h"
#include "libtransmission/peer-mgr-wishlist.h"
#include "libtransmission/tr-assert.h"
struct tr_peer;
class ActiveRequests::Impl
{
public:
[[nodiscard]] size_t size() const
{
return size_;
}
[[nodiscard]] size_t count(tr_peer const* peer) const
{
auto const it = count_.find(peer);
return it != std::end(count_) ? it->second : size_t{};
}
void incCount(tr_peer const* peer)
{
++count_[peer];
++size_;
}
void decCount(tr_peer const* peer)
{
auto it = count_.find(peer);
TR_ASSERT(it != std::end(count_));
TR_ASSERT(it->second > 0);
TR_ASSERT(size_ > 0);
if (it != std::end(count_))
{
if (--it->second == 0)
{
count_.erase(it);
}
--size_;
}
}
std::unordered_map<tr_peer const*, size_t> count_;
std::unordered_map<tr_block_index_t, small::map<tr_peer const*, time_t, Wishlist::EndgameMaxPeers>> blocks_;
private:
size_t size_ = 0;
};
ActiveRequests::ActiveRequests()
: impl_{ std::make_unique<Impl>() }
{
}
ActiveRequests::~ActiveRequests() = default;
bool ActiveRequests::add(tr_block_index_t block, tr_peer* peer, time_t when)
{
bool const added = impl_->blocks_[block].emplace(peer, when).second;
if (added)
{
impl_->incCount(peer);
}
return added;
}
// remove a request to `peer` for `block`
bool ActiveRequests::remove(tr_block_index_t block, tr_peer const* peer)
{
auto const it = impl_->blocks_.find(block);
auto const removed = it != std::end(impl_->blocks_) && it->second.erase(peer) != 0;
if (removed)
{
impl_->decCount(peer);
if (std::empty(it->second))
{
impl_->blocks_.erase(it);
}
}
return removed;
}
// remove requests to `peer` and return the associated blocks
std::vector<tr_block_index_t> ActiveRequests::remove(tr_peer const* peer)
{
auto removed = std::vector<tr_block_index_t>{};
removed.reserve(impl_->blocks_.size());
for (auto const& [block, peers_at] : impl_->blocks_)
{
if (peers_at.contains(peer))
{
removed.push_back(block);
}
}
for (auto block : removed)
{
remove(block, peer);
}
return removed;
}
// remove requests for `block` and return the associated peers
std::vector<tr_peer*> ActiveRequests::remove(tr_block_index_t block)
{
auto removed = std::vector<tr_peer*>{};
if (auto it = impl_->blocks_.find(block); it != std::end(impl_->blocks_))
{
auto const n = std::size(it->second);
removed.resize(n);
std::transform(
std::begin(it->second),
std::end(it->second),
std::begin(removed),
[](auto const& iter) { return const_cast<tr_peer*>(iter.first); });
impl_->blocks_.erase(block);
}
for (auto const* const peer : removed)
{
impl_->decCount(peer);
}
return removed;
}
// return true if there's an active request to `peer` for `block`
bool ActiveRequests::has(tr_block_index_t block, tr_peer const* peer) const
{
auto const iter = impl_->blocks_.find(block);
return iter != std::end(impl_->blocks_) && iter->second.contains(peer);
}
// count how many peers we're asking for `block`
size_t ActiveRequests::count(tr_block_index_t block) const
{
auto const& blocks = impl_->blocks_;
auto const iter = blocks.find(block);
return iter == std::end(blocks) ? 0U : std::size(iter->second);
}
// count how many active block requests we have to `peer`
size_t ActiveRequests::count(tr_peer const* peer) const
{
return impl_->count(peer);
}
// return the total number of active requests
size_t ActiveRequests::size() const
{
return impl_->size();
}
// returns the active requests sent before `when`
std::vector<std::pair<tr_block_index_t, tr_peer*>> ActiveRequests::sentBefore(time_t when) const
{
auto sent_before = std::vector<std::pair<tr_block_index_t, tr_peer*>>{};
sent_before.reserve(std::size(impl_->blocks_));
for (auto const& [block, peers_at] : impl_->blocks_)
{
for (auto const& [peer, sent_at] : peers_at)
{
if (sent_at < when)
{
sent_before.emplace_back(block, const_cast<tr_peer*>(peer));
}
}
}
return sent_before;
}

View file

@ -1,62 +0,0 @@
// This file Copyright © Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#pragma once
#ifndef LIBTRANSMISSION_PEER_MODULE
#error only the libtransmission peer module should #include this header.
#endif
#include <cstddef> // size_t
#include <ctime> // time_t
#include <memory>
#include <utility>
#include <vector>
#include "libtransmission/transmission.h" // tr_block_index_t
struct tr_peer;
/**
* Bookkeeping for the active requests we have --
* e.g. the requests we've sent and are awaiting a response.
*/
class ActiveRequests
{
public:
ActiveRequests();
~ActiveRequests();
// record that we've requested `block` from `peer`
bool add(tr_block_index_t block, tr_peer* peer, time_t when);
// erase any record of a request for `block` from `peer`
bool remove(tr_block_index_t block, tr_peer const* peer);
// erase any record of requests to `peer` and return the previously-associated blocks
std::vector<tr_block_index_t> remove(tr_peer const* peer);
// erase any record of requests to `block` and return the previously-associated peers
std::vector<tr_peer*> remove(tr_block_index_t block);
// return true if there's a record of a request for `block` from `peer`
[[nodiscard]] bool has(tr_block_index_t block, tr_peer const* peer) const;
// count how many peers we're asking for `block`
[[nodiscard]] size_t count(tr_block_index_t block) const;
// count how many active block requests we have to `peer`
[[nodiscard]] size_t count(tr_peer const* peer) const;
// return the total number of active requests
[[nodiscard]] size_t size() const;
// returns the active requests sent before `when`
[[nodiscard]] std::vector<std::pair<tr_block_index_t, tr_peer*>> sentBefore(time_t when) const;
private:
class Impl;
std::unique_ptr<Impl> const impl_;
};

View file

@ -9,6 +9,7 @@
#include <utility>
#include <vector>
#include <small/map.hpp>
#include <small/vector.hpp>
#define LIBTRANSMISSION_PEER_MODULE
@ -19,9 +20,14 @@
#include "libtransmission/crypto-utils.h" // for tr_salt_shaker
#include "libtransmission/peer-mgr-wishlist.h"
// Asserts in this file are expensive, so hide them in #ifdef
#ifdef TR_WISHLIST_ASSERT
#include "libtransmission/tr-assert.h"
#endif
namespace
{
std::vector<tr_block_span_t> make_spans(small::vector<tr_block_index_t> const& blocks)
[[nodiscard]] std::vector<tr_block_span_t> make_spans(small::vector<tr_block_index_t> const& blocks)
{
if (std::empty(blocks))
{
@ -55,18 +61,22 @@ class Wishlist::Impl
{
struct Candidate
{
Candidate(
tr_piece_index_t piece_in,
size_t replication_in,
tr_priority_t priority_in,
tr_piece_index_t salt_in,
Mediator const* mediator)
Candidate(tr_piece_index_t piece_in, tr_piece_index_t salt_in, Mediator const* mediator)
: piece{ piece_in }
, replication{ replication_in }
, priority{ priority_in }
, block_span{ mediator->block_span(piece_in) }
, replication{ mediator->count_piece_replication(piece_in) }
, priority{ mediator->priority(piece_in) }
, salt{ salt_in }
, mediator_{ mediator }
{
n_reqs.reserve(block_span.end - block_span.begin);
for (auto [block, end] = block_span; block < end; ++block)
{
if (!mediator_->client_has_block(block))
{
n_reqs.try_emplace(block, mediator_->count_active_requests(block));
}
}
}
[[nodiscard]] int compare(Candidate const& that) const noexcept; // <=>
@ -77,6 +87,9 @@ class Wishlist::Impl
}
tr_piece_index_t piece;
tr_block_span_t block_span;
small::map<tr_block_index_t, uint8_t> n_reqs;
// Caching the following 2 values are highly beneficial, because:
// - they are often used (mainly because resort_piece() is called
@ -96,12 +109,12 @@ class Wishlist::Impl
using CandidateVec = std::vector<Candidate>;
public:
explicit Impl(std::unique_ptr<Mediator> mediator_in);
explicit Impl(Mediator& mediator_in);
std::vector<tr_block_span_t> next(
[[nodiscard]] std::vector<tr_block_span_t> next(
size_t n_wanted_blocks,
std::function<bool(tr_piece_index_t)> const& peer_has_piece,
std::function<bool(tr_block_index_t)> const& has_active_pending_to_peer);
std::function<bool(tr_block_index_t)> const& has_active_request_to_peer);
private:
constexpr void set_candidates_dirty() noexcept
@ -122,7 +135,7 @@ private:
}
}
TR_CONSTEXPR20 void dec_replication_from_bitfield(tr_bitfield const& bitfield)
TR_CONSTEXPR20 void dec_replication_bitfield(tr_bitfield const& bitfield)
{
if (candidates_dirty_)
{
@ -162,7 +175,7 @@ private:
}
}
void inc_replication_from_bitfield(tr_bitfield const& bitfield)
void inc_replication_bitfield(tr_bitfield const& bitfield)
{
if (candidates_dirty_)
{
@ -198,7 +211,7 @@ private:
return;
}
if (auto iter = piece_lookup(piece); iter != std::end(candidates_))
if (auto iter = find_by_piece(piece); iter != std::end(candidates_))
{
++iter->replication;
resort_piece(iter);
@ -207,12 +220,115 @@ private:
// ---
TR_CONSTEXPR20 CandidateVec::iterator piece_lookup(tr_piece_index_t const piece)
TR_CONSTEXPR20 void inc_active_request_span(tr_block_span_t block_span)
{
if (candidates_dirty_)
{
return;
}
for (auto block = block_span.begin; block < block_span.end;)
{
auto it_p = find_by_block(block);
if (it_p == std::end(candidates_))
{
set_candidates_dirty();
break;
}
auto& n_reqs = it_p->n_reqs;
auto it_b_begin = std::begin(n_reqs);
it_b_begin = it_b_begin->first >= block_span.begin ? it_b_begin : n_reqs.lower_bound(block_span.begin);
auto it_b_end = std::end(n_reqs);
it_b_end = std::prev(it_b_end)->first < block_span.end ? it_b_end : n_reqs.lower_bound(block_span.end);
for (auto it_b = it_b_begin; it_b != it_b_end; ++it_b)
{
++it_b->second;
}
block = it_p->block_span.end;
}
}
TR_CONSTEXPR20 void dec_active_request_block(tr_block_index_t block)
{
if (candidates_dirty_)
{
return;
}
if (auto it_p = find_by_block(block); it_p != std::end(candidates_))
{
auto& n_reqs = it_p->n_reqs;
if (auto it_b = n_reqs.find(block); it_b != std::end(n_reqs) && it_b->second > 0U)
{
--it_b->second;
}
}
}
TR_CONSTEXPR20 void dec_active_request_bitfield(tr_bitfield const& requests)
{
if (candidates_dirty_)
{
return;
}
for (auto& candidate : candidates_)
{
for (auto& [block, n_req] : candidate.n_reqs)
{
if (n_req > 0U && requests.test(block))
{
--n_req;
}
}
}
}
// ---
TR_CONSTEXPR20 void client_got_block(tr_block_index_t block)
{
if (candidates_dirty_)
{
return;
}
if (auto iter = find_by_block(block); iter != std::end(candidates_))
{
iter->n_reqs.erase(block);
resort_piece(iter);
}
}
// ---
TR_CONSTEXPR20 void peer_disconnect(tr_bitfield const& have, tr_bitfield const& requests)
{
dec_replication_bitfield(have);
dec_active_request_bitfield(requests);
}
// ---
[[nodiscard]] TR_CONSTEXPR20 CandidateVec::iterator find_by_piece(tr_piece_index_t const piece)
{
return std::find_if(
std::begin(candidates_),
std::end(candidates_),
[piece](auto const& candidate) { return candidate.piece == piece; });
[piece](auto const& c) { return c.piece == piece; });
}
[[nodiscard]] TR_CONSTEXPR20 CandidateVec::iterator find_by_block(tr_block_index_t const block)
{
return std::find_if(
std::begin(candidates_),
std::end(candidates_),
[block](auto const& c) { return c.block_span.begin <= block && block < c.block_span.end; });
}
void maybe_rebuild_candidate_list()
@ -225,23 +341,18 @@ private:
candidates_.clear();
auto salter = tr_salt_shaker<tr_piece_index_t>{};
auto const is_sequential = mediator_->is_sequential_download();
auto const n_pieces = mediator_->piece_count();
auto const is_sequential = mediator_.is_sequential_download();
auto const n_pieces = mediator_.piece_count();
candidates_.reserve(n_pieces);
for (tr_piece_index_t piece = 0U; piece < n_pieces; ++piece)
{
if (mediator_->count_missing_blocks(piece) <= 0U || !mediator_->client_wants_piece(piece))
if (mediator_.client_has_piece(piece) || !mediator_.client_wants_piece(piece))
{
continue;
}
auto const salt = is_sequential ? piece : salter();
candidates_.emplace_back(
piece,
mediator_->count_piece_replication(piece),
mediator_->priority(piece),
salt,
mediator_.get());
candidates_.emplace_back(piece, salt, &mediator_);
}
std::sort(std::begin(candidates_), std::end(candidates_));
}
@ -253,25 +364,12 @@ private:
return;
}
if (auto iter = piece_lookup(piece); iter != std::end(candidates_))
if (auto iter = find_by_piece(piece); iter != std::end(candidates_))
{
candidates_.erase(iter);
}
}
TR_CONSTEXPR20 void resort_piece(tr_piece_index_t const piece)
{
if (candidates_dirty_)
{
return;
}
if (auto iter = piece_lookup(piece); iter != std::end(candidates_))
{
resort_piece(iter);
}
}
TR_CONSTEXPR20 void resort_piece(CandidateVec::iterator const pos_old)
{
if (candidates_dirty_)
@ -297,32 +395,39 @@ private:
CandidateVec candidates_;
bool candidates_dirty_ = true;
bool is_endgame_ = false;
std::array<libtransmission::ObserverTag, 8U> const tags_;
std::array<libtransmission::ObserverTag, 13U> const tags_;
std::unique_ptr<Mediator> const mediator_;
Mediator& mediator_;
};
Wishlist::Impl::Impl(std::unique_ptr<Mediator> mediator_in)
Wishlist::Impl::Impl(Mediator& mediator_in)
: tags_{ {
mediator_in->observe_peer_disconnect([this](tr_torrent*, tr_bitfield const& b) { dec_replication_from_bitfield(b); }),
mediator_in->observe_got_bitfield([this](tr_torrent*, tr_bitfield const& b) { inc_replication_from_bitfield(b); }),
mediator_in->observe_got_block([this](tr_torrent*, tr_piece_index_t p, tr_block_index_t) { resort_piece(p); }),
mediator_in->observe_got_have([this](tr_torrent*, tr_piece_index_t p) { inc_replication_piece(p); }),
mediator_in->observe_got_have_all([this](tr_torrent*) { inc_replication(); }),
mediator_in->observe_piece_completed([this](tr_torrent*, tr_piece_index_t p) { remove_piece(p); }),
mediator_in->observe_priority_changed([this](tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t)
{ set_candidates_dirty(); }),
mediator_in->observe_sequential_download_changed([this](tr_torrent*, bool) { set_candidates_dirty(); }),
mediator_in.observe_peer_disconnect([this](tr_torrent*, tr_bitfield const& b, tr_bitfield const& ar)
{ peer_disconnect(b, ar); }),
mediator_in.observe_got_bad_piece([this](tr_torrent*, tr_piece_index_t) { set_candidates_dirty(); }),
mediator_in.observe_got_bitfield([this](tr_torrent*, tr_bitfield const& b) { inc_replication_bitfield(b); }),
mediator_in.observe_got_block([this](tr_torrent*, tr_block_index_t b) { client_got_block(b); }),
mediator_in.observe_got_choke([this](tr_torrent*, tr_bitfield const& b) { dec_active_request_bitfield(b); }),
mediator_in.observe_got_have([this](tr_torrent*, tr_piece_index_t p) { inc_replication_piece(p); }),
mediator_in.observe_got_have_all([this](tr_torrent*) { inc_replication(); }),
mediator_in.observe_got_reject([this](tr_torrent*, tr_peer*, tr_block_index_t b) { dec_active_request_block(b); }),
mediator_in.observe_piece_completed([this](tr_torrent*, tr_piece_index_t p) { remove_piece(p); }),
mediator_in.observe_priority_changed([this](tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t)
{ set_candidates_dirty(); }),
mediator_in.observe_sent_cancel([this](tr_torrent*, tr_peer*, tr_block_index_t b) { dec_active_request_block(b); }),
mediator_in.observe_sent_request([this](tr_torrent*, tr_peer*, tr_block_span_t bs) { inc_active_request_span(bs); }),
mediator_in.observe_sequential_download_changed([this](tr_torrent*, bool) { set_candidates_dirty(); }),
} }
, mediator_{ std::move(mediator_in) }
, mediator_{ mediator_in }
{
}
std::vector<tr_block_span_t> Wishlist::Impl::next(
size_t n_wanted_blocks,
std::function<bool(tr_piece_index_t)> const& peer_has_piece,
std::function<bool(tr_block_index_t)> const& has_active_pending_to_peer)
std::function<bool(tr_block_index_t)> const& has_active_request_to_peer)
{
if (n_wanted_blocks == 0U)
{
@ -331,6 +436,7 @@ std::vector<tr_block_span_t> Wishlist::Impl::next(
maybe_rebuild_candidate_list();
auto const max_peers = is_endgame_ ? EndgameMaxPeers : NormalMaxPeers;
auto blocks = small::vector<tr_block_index_t>{};
blocks.reserve(n_wanted_blocks);
for (auto const& candidate : candidates_)
@ -342,26 +448,34 @@ std::vector<tr_block_span_t> Wishlist::Impl::next(
}
// if the peer doesn't have this piece that we want...
if (!peer_has_piece(candidate.piece))
if (candidate.replication == 0 || !peer_has_piece(candidate.piece))
{
continue;
}
// walk the blocks in this piece
for (auto [block, end] = mediator_->block_span(candidate.piece); block < end && std::size(blocks) < n_wanted_blocks;
++block)
// walk the blocks in this piece that we don't have
for (auto const& [block, n_req] : candidate.n_reqs)
{
// don't request blocks that:
// 1. we've already got, or
// 2. already has an active request to that peer
if (mediator_->client_has_block(block) || has_active_pending_to_peer(block))
if (std::size(blocks) >= n_wanted_blocks)
{
break;
}
#ifdef TR_WISHLIST_ASSERT
auto const n_req_truth = mediator_.count_active_requests(block);
TR_ASSERT_MSG(
n_req == n_req_truth,
fmt::format("piece = {}, block = {}, n_req = {}, truth = {}", candidate.piece, block, n_req, n_req_truth));
#endif
// don't request from too many peers
if (n_req >= max_peers)
{
continue;
}
// don't request from too many peers
auto const n_peers = mediator_->count_active_requests(block);
if (auto const max_peers = mediator_->is_endgame() ? EndgameMaxPeers : NormalMaxPeers; n_peers >= max_peers)
// don't request block from peers which we already requested from
if (has_active_request_to_peer(block))
{
continue;
}
@ -370,6 +484,8 @@ std::vector<tr_block_span_t> Wishlist::Impl::next(
}
}
is_endgame_ = std::size(blocks) < n_wanted_blocks;
// Ensure the list of blocks are sorted
// The list needs to be unique as well, but that should come naturally
std::sort(std::begin(blocks), std::end(blocks));
@ -379,8 +495,7 @@ std::vector<tr_block_span_t> Wishlist::Impl::next(
int Wishlist::Impl::Candidate::compare(Wishlist::Impl::Candidate const& that) const noexcept
{
// prefer pieces closer to completion
if (auto const val = tr_compare_3way(mediator_->count_missing_blocks(piece), mediator_->count_missing_blocks(that.piece));
val != 0)
if (auto const val = tr_compare_3way(std::size(n_reqs), std::size(that.n_reqs)); val != 0)
{
return val;
}
@ -402,8 +517,8 @@ int Wishlist::Impl::Candidate::compare(Wishlist::Impl::Candidate const& that) co
// ---
Wishlist::Wishlist(std::unique_ptr<Mediator> mediator_in)
: impl_{ std::make_unique<Impl>(std::move(mediator_in)) }
Wishlist::Wishlist(Mediator& mediator_in)
: impl_{ std::make_unique<Impl>(mediator_in) }
{
}

View file

@ -20,6 +20,7 @@
#include "libtransmission/utils.h"
class tr_bitfield;
struct tr_peer;
/**
* Figures out what blocks we want to request next.
@ -33,38 +34,47 @@ public:
struct Mediator
{
[[nodiscard]] virtual bool client_has_block(tr_block_index_t block) const = 0;
[[nodiscard]] virtual bool client_has_piece(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual bool client_wants_piece(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual bool is_endgame() const = 0;
[[nodiscard]] virtual bool is_sequential_download() const = 0;
[[nodiscard]] virtual size_t count_active_requests(tr_block_index_t block) const = 0;
[[nodiscard]] virtual size_t count_missing_blocks(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual uint8_t count_active_requests(tr_block_index_t block) const = 0;
[[nodiscard]] virtual size_t count_piece_replication(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual tr_block_span_t block_span(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual tr_piece_index_t piece_count() const = 0;
[[nodiscard]] virtual tr_priority_t priority(tr_piece_index_t piece) const = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_peer_disconnect(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) = 0;
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&, tr_bitfield const&>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_bad_piece(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_bitfield(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_block(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t>::Observer observer) = 0;
libtransmission::SimpleObservable<tr_torrent*, tr_block_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_choke(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_have(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_have_all(
libtransmission::SimpleObservable<tr_torrent*>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_got_reject(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_piece_completed(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_priority_changed(
libtransmission::SimpleObservable<tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t>::Observer
observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_sent_cancel(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_sent_request(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_span_t>::Observer observer) = 0;
[[nodiscard]] virtual libtransmission::ObserverTag observe_sequential_download_changed(
libtransmission::SimpleObservable<tr_torrent*, bool>::Observer observer) = 0;
virtual ~Mediator() = default;
};
explicit Wishlist(std::unique_ptr<Mediator> mediator_in);
explicit Wishlist(Mediator& mediator_in);
~Wishlist();
// the next blocks that we should request from a peer

View file

@ -39,7 +39,6 @@
#include "libtransmission/observable.h"
#include "libtransmission/peer-common.h"
#include "libtransmission/peer-io.h"
#include "libtransmission/peer-mgr-active-requests.h"
#include "libtransmission/peer-mgr-wishlist.h"
#include "libtransmission/peer-mgr.h"
#include "libtransmission/peer-msgs.h"
@ -312,31 +311,40 @@ public:
}
[[nodiscard]] bool client_has_block(tr_block_index_t block) const override;
[[nodiscard]] bool client_has_piece(tr_piece_index_t piece) const override;
[[nodiscard]] bool client_wants_piece(tr_piece_index_t piece) const override;
[[nodiscard]] bool is_endgame() const override;
[[nodiscard]] bool is_sequential_download() const override;
[[nodiscard]] size_t count_active_requests(tr_block_index_t block) const override;
[[nodiscard]] size_t count_missing_blocks(tr_piece_index_t piece) const override;
[[nodiscard]] uint8_t count_active_requests(tr_block_index_t block) const override;
[[nodiscard]] size_t count_piece_replication(tr_piece_index_t piece) const override;
[[nodiscard]] tr_block_span_t block_span(tr_piece_index_t piece) const override;
[[nodiscard]] tr_piece_index_t piece_count() const override;
[[nodiscard]] tr_priority_t priority(tr_piece_index_t piece) const override;
[[nodiscard]] libtransmission::ObserverTag observe_peer_disconnect(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) override;
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&, tr_bitfield const&>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_bad_piece(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_bitfield(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_block(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t>::Observer observer) override;
libtransmission::SimpleObservable<tr_torrent*, tr_block_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_choke(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_have(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_have_all(
libtransmission::SimpleObservable<tr_torrent*>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_got_reject(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_piece_completed(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_priority_changed(
libtransmission::SimpleObservable<tr_torrent*, tr_file_index_t const*, tr_file_index_t, tr_priority_t>::Observer
observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_sent_cancel(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_sent_request(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_span_t>::Observer observer) override;
[[nodiscard]] libtransmission::ObserverTag observe_sequential_download_changed(
libtransmission::SimpleObservable<tr_torrent*, bool>::Observer observer) override;
@ -379,18 +387,6 @@ public:
TR_ASSERT(std::empty(peers));
}
void cancel_old_requests()
{
auto const now = tr_time();
auto const oldest = now - RequestTtlSecs;
for (auto const& [block, peer] : active_requests.sentBefore(oldest))
{
maybe_send_cancel_request(peer, block, nullptr);
active_requests.remove(block, peer);
}
}
[[nodiscard]] uint16_t count_active_webseeds(uint64_t now) const noexcept
{
if (!tor->is_running() || tor->is_done())
@ -413,7 +409,7 @@ public:
{
auto const lock = unique_lock();
peer_disconnect.emit(tor, peer->has());
peer_disconnect.emit(tor, peer->has(), peer->active_requests);
auto const& peer_info = peer->peer_info;
TR_ASSERT(peer_info);
@ -442,18 +438,6 @@ public:
TR_ASSERT(stats.peer_count == 0);
}
void update_endgame()
{
/* we consider ourselves to be in endgame if the number of bytes
we've got requested is >= the number of bytes left to download */
is_endgame_ = uint64_t(std::size(active_requests)) * tr_block_info::BlockSize >= tor->left_until_done();
}
[[nodiscard]] constexpr auto is_endgame() const noexcept
{
return is_endgame_;
}
[[nodiscard]] TR_CONSTEXPR20 auto is_all_upload_only() const noexcept
{
if (!pool_is_all_upload_only_)
@ -513,6 +497,14 @@ public:
switch (event.type)
{
case tr_peer_event::Type::ClientSentCancel:
{
auto* const tor = s->tor;
auto const loc = tor->piece_loc(event.pieceIndex, event.offset);
s->sent_cancel.emit(tor, msgs, loc.block);
}
break;
case tr_peer_event::Type::ClientSentPieceData:
{
auto* const tor = s->tor;
@ -549,7 +541,7 @@ public:
break;
case tr_peer_event::Type::ClientGotChoke:
s->active_requests.remove(msgs);
s->got_choke.emit(s->tor, msgs->active_requests);
break;
case tr_peer_event::Type::ClientGotPort:
@ -595,11 +587,16 @@ public:
}
}
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&> peer_disconnect;
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const& /*bitfield*/, tr_bitfield const& /*active requests*/>
peer_disconnect;
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&> got_bitfield;
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t> got_block;
libtransmission::SimpleObservable<tr_torrent*, tr_block_index_t> got_block;
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&> got_choke;
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t> got_have;
libtransmission::SimpleObservable<tr_torrent*> got_have_all;
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t> got_reject;
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t> sent_cancel;
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_span_t> sent_request;
mutable tr_swarm_stats stats = {};
@ -611,14 +608,12 @@ public:
tr_torrent* const tor;
ActiveRequests active_requests;
// depends-on: active_requests
std::vector<std::unique_ptr<tr_webseed>> webseeds;
// depends-on: active_requests
Peers peers;
// depends-on: tor
WishlistMediator wishlist_mediator{ *this };
std::unique_ptr<Wishlist> wishlist;
Pool connectable_pool;
@ -671,13 +666,13 @@ private:
{
if (peer != nullptr && peer != muted)
{
peer->cancel_block_request(block);
peer->maybe_cancel_block_request(block);
}
}
void cancel_all_requests_for_block(tr_block_index_t block, tr_peer const* no_notify)
{
for (auto* peer : active_requests.remove(block))
for (auto* peer : peers)
{
maybe_send_cancel_request(peer, block, no_notify);
}
@ -805,8 +800,21 @@ private:
{
switch (event.type)
{
case tr_peer_event::Type::ClientSentRequest:
{
auto* const tor = s->tor;
auto const loc_begin = tor->piece_loc(event.pieceIndex, event.offset);
auto const loc_end = tor->piece_loc(event.pieceIndex, event.offset, event.length);
s->sent_request.emit(tor, peer, { loc_begin.block, loc_end.block });
}
break;
case tr_peer_event::Type::ClientGotRej:
s->active_requests.remove(s->tor->piece_loc(event.pieceIndex, event.offset).block, peer);
{
auto* const tor = s->tor;
auto const loc = tor->piece_loc(event.pieceIndex, event.offset);
s->got_reject.emit(tor, peer, loc.block);
}
break;
case tr_peer_event::Type::ClientGotBlock:
@ -817,7 +825,7 @@ private:
peer->blocks_sent_to_client.add(tr_time(), 1);
peer->blame.set(loc.piece);
tor->on_block_received(loc.block);
s->got_block.emit(tor, event.pieceIndex, loc.block);
s->got_block.emit(tor, loc.block);
}
break;
@ -907,14 +915,9 @@ EXIT:
// number of bad pieces a peer is allowed to send before we ban them
static auto constexpr MaxBadPiecesPerPeer = 5U;
// how long we'll let requests we've made linger before we cancel them
static auto constexpr RequestTtlSecs = 90;
std::array<libtransmission::ObserverTag, 8> const tags_;
mutable std::optional<bool> pool_is_all_upload_only_;
bool is_endgame_ = false;
};
bool tr_swarm::WishlistMediator::client_has_block(tr_block_index_t block) const
@ -922,29 +925,29 @@ bool tr_swarm::WishlistMediator::client_has_block(tr_block_index_t block) const
return tor_.has_block(block);
}
bool tr_swarm::WishlistMediator::client_has_piece(tr_piece_index_t piece) const
{
return tor_.has_blocks(block_span(piece));
}
bool tr_swarm::WishlistMediator::client_wants_piece(tr_piece_index_t piece) const
{
return tor_.piece_is_wanted(piece);
}
bool tr_swarm::WishlistMediator::is_endgame() const
{
return swarm_.is_endgame();
}
bool tr_swarm::WishlistMediator::is_sequential_download() const
{
return tor_.is_sequential_download();
}
size_t tr_swarm::WishlistMediator::count_active_requests(tr_block_index_t block) const
uint8_t tr_swarm::WishlistMediator::count_active_requests(tr_block_index_t block) const
{
return swarm_.active_requests.count(block);
}
size_t tr_swarm::WishlistMediator::count_missing_blocks(tr_piece_index_t piece) const
{
return tor_.count_missing_blocks_in_piece(piece);
auto const op = [block](uint8_t acc, auto const& peer)
{
return acc + (peer->active_requests.test(block) ? 1U : 0U);
};
return std::accumulate(std::begin(swarm_.peers), std::end(swarm_.peers), uint8_t{}, op) +
std::accumulate(std::begin(swarm_.webseeds), std::end(swarm_.webseeds), uint8_t{}, op);
}
size_t tr_swarm::WishlistMediator::count_piece_replication(tr_piece_index_t piece) const
@ -959,7 +962,17 @@ size_t tr_swarm::WishlistMediator::count_piece_replication(tr_piece_index_t piec
tr_block_span_t tr_swarm::WishlistMediator::block_span(tr_piece_index_t piece) const
{
return tor_.block_span_for_piece(piece);
auto span = tor_.block_span_for_piece(piece);
// Overlapping block spans caused by blocks unaligned to piece boundaries
// might cause redundant block requests to be sent out, so detect it and
// ensure that block spans within the wishlist do not overlap.
if (auto const is_unaligned_piece = tor_.block_loc(span.begin).piece != piece; is_unaligned_piece)
{
++span.begin;
}
return span;
}
tr_piece_index_t tr_swarm::WishlistMediator::piece_count() const
@ -973,11 +986,17 @@ tr_priority_t tr_swarm::WishlistMediator::priority(tr_piece_index_t piece) const
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_peer_disconnect(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer)
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&, tr_bitfield const&>::Observer observer)
{
return swarm_.peer_disconnect.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_bad_piece(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer)
{
return tor_.got_bad_piece_.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_bitfield(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer)
{
@ -985,11 +1004,17 @@ libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_bitfield(
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_block(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t, tr_block_index_t>::Observer observer)
libtransmission::SimpleObservable<tr_torrent*, tr_block_index_t>::Observer observer)
{
return swarm_.got_block.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_choke(
libtransmission::SimpleObservable<tr_torrent*, tr_bitfield const&>::Observer observer)
{
return swarm_.got_choke.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_have(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer)
{
@ -1002,6 +1027,12 @@ libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_have_all(
return swarm_.got_have_all.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_got_reject(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t>::Observer observer)
{
return swarm_.got_reject.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_piece_completed(
libtransmission::SimpleObservable<tr_torrent*, tr_piece_index_t>::Observer observer)
{
@ -1014,6 +1045,18 @@ libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_priority_change
return tor_.priority_changed_.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_sent_cancel(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_index_t>::Observer observer)
{
return swarm_.sent_cancel.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_sent_request(
libtransmission::SimpleObservable<tr_torrent*, tr_peer*, tr_block_span_t>::Observer observer)
{
return swarm_.sent_request.observe(std::move(observer));
}
libtransmission::ObserverTag tr_swarm::WishlistMediator::observe_sequential_download_changed(
libtransmission::SimpleObservable<tr_torrent*, bool>::Observer observer)
{
@ -1028,7 +1071,6 @@ private:
static auto constexpr BandwidthTimerPeriod = 500ms;
static auto constexpr PeerInfoPeriod = 1min;
static auto constexpr RechokePeriod = 10s;
static auto constexpr RefillUpkeepPeriod = 10s;
// Max number of outbound peer connections to initiate.
// This throttle is an arbitrary number to avoid overloading routers.
@ -1063,13 +1105,11 @@ public:
, bandwidth_timer_{ timer_maker.create([this]() { bandwidth_pulse(); }) }
, peer_info_timer_{ timer_maker.create([this]() { peer_info_pulse(); }) }
, rechoke_timer_{ timer_maker.create([this]() { rechoke_pulse_marshall(); }) }
, refill_upkeep_timer_{ timer_maker.create([this]() { refill_upkeep(); }) }
, blocklists_tag_{ blocklist.observe_changes([this]() { on_blocklists_changed(); }) }
{
bandwidth_timer_->start_repeating(BandwidthTimerPeriod);
peer_info_timer_->start_repeating(PeerInfoPeriod);
rechoke_timer_->start_repeating(RechokePeriod);
refill_upkeep_timer_->start_repeating(RefillUpkeepPeriod);
}
tr_peerMgr(tr_peerMgr&&) = delete;
@ -1112,7 +1152,6 @@ private:
void peer_info_pulse();
void rechoke_pulse() const;
void reconnect_pulse();
void refill_upkeep() const;
void rechoke_pulse_marshall()
{
@ -1143,7 +1182,6 @@ private:
std::unique_ptr<libtransmission::Timer> const bandwidth_timer_;
std::unique_ptr<libtransmission::Timer> const peer_info_timer_;
std::unique_ptr<libtransmission::Timer> const rechoke_timer_;
std::unique_ptr<libtransmission::Timer> const refill_upkeep_timer_;
libtransmission::ObserverTag const blocklists_tag_;
};
@ -1153,18 +1191,11 @@ private:
tr_peer::tr_peer(tr_torrent const& tor)
: session{ tor.session }
, swarm{ tor.swarm }
, active_requests{ tor.block_count() }
, blame{ tor.piece_count() }
{
}
tr_peer::~tr_peer()
{
if (swarm != nullptr)
{
swarm->active_requests.remove(this);
}
}
// ---
tr_peerMgr* tr_peerMgrNew(tr_session* session)
@ -1194,54 +1225,18 @@ void tr_peerMgrFree(tr_peerMgr* manager)
* tr_peerMgrGetNextRequests() is called.
*/
// --- struct block_request
// TODO: if we keep this, add equivalent API to ActiveRequest
void tr_peerMgrClientSentRequests(tr_torrent* torrent, tr_peer* peer, tr_block_span_t span)
{
auto const now = tr_time();
for (tr_block_index_t block = span.begin; block < span.end; ++block)
{
torrent->swarm->active_requests.add(block, peer, now);
}
}
std::vector<tr_block_span_t> tr_peerMgrGetNextRequests(tr_torrent* torrent, tr_peer const* peer, size_t numwant)
{
TR_ASSERT(!torrent->is_done());
tr_swarm& swarm = *torrent->swarm;
if (!swarm.wishlist)
{
swarm.wishlist = std::make_unique<Wishlist>(std::make_unique<tr_swarm::WishlistMediator>(swarm));
swarm.wishlist = std::make_unique<Wishlist>(swarm.wishlist_mediator);
}
swarm.update_endgame();
return swarm.wishlist->next(
numwant,
[peer](tr_piece_index_t p) { return peer->has_piece(p); },
[peer, &swarm](tr_block_index_t b) { return swarm.active_requests.has(b, peer); });
}
// --- Piece List Manipulation / Accessors
bool tr_peerMgrDidPeerRequest(tr_torrent const* tor, tr_peer const* peer, tr_block_index_t block)
{
return tor->swarm->active_requests.has(block, peer);
}
size_t tr_peerMgrCountActiveRequestsToPeer(tr_torrent const* tor, tr_peer const* peer)
{
return tor->swarm->active_requests.count(peer);
}
void tr_peerMgr::refill_upkeep() const
{
auto const lock = unique_lock();
for (auto* const tor : torrents_)
{
tor->swarm->cancel_old_requests();
}
[peer](tr_block_index_t b) { return peer->active_requests.test(b); });
}
namespace

View file

@ -633,12 +633,6 @@ void tr_peerMgrFree(tr_peerMgr* manager);
[[nodiscard]] std::vector<tr_block_span_t> tr_peerMgrGetNextRequests(tr_torrent* torrent, tr_peer const* peer, size_t numwant);
[[nodiscard]] bool tr_peerMgrDidPeerRequest(tr_torrent const* torrent, tr_peer const* peer, tr_block_index_t block);
void tr_peerMgrClientSentRequests(tr_torrent* torrent, tr_peer* peer, tr_block_span_t span);
[[nodiscard]] size_t tr_peerMgrCountActiveRequestsToPeer(tr_torrent const* torrent, tr_peer const* peer);
void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_peer_socket&& socket);
size_t tr_peerMgrAddPex(tr_torrent* tor, tr_peer_from from, tr_pex const* pex, size_t n_pex);

View file

@ -360,7 +360,7 @@ public:
switch (dir)
{
case TR_CLIENT_TO_PEER: // requests we sent
return tr_peerMgrCountActiveRequestsToPeer(&tor_, this);
return active_requests.count();
case TR_PEER_TO_CLIENT: // requests they sent
return std::size(peer_requested_);
@ -401,10 +401,15 @@ public:
update_active();
}
void cancel_block_request(tr_block_index_t block) override
void maybe_cancel_block_request(tr_block_index_t block) override
{
cancels_sent_to_peer.add(tr_time(), 1);
protocol_send_cancel(peer_request::from_block(tor_, block));
if (active_requests.test(block))
{
cancels_sent_to_peer.add(tr_time(), 1);
active_requests.unset(block);
publish(tr_peer_event::SentCancel(tor_.block_info(), block));
protocol_send_cancel(peer_request::from_block(tor_, block));
}
}
void set_choke(bool peer_is_choked) override
@ -462,9 +467,15 @@ public:
TR_ASSERT(client_is_interested());
TR_ASSERT(!client_is_choked());
if (active_requests.has_none())
{
request_timeout_base_ = tr_time();
}
for (auto const *span = block_spans, *span_end = span + n_spans; span != span_end; ++span)
{
for (auto [block, block_end] = *span; block < block_end; ++block)
auto const [block_begin, block_end] = *span;
for (auto block = block_begin; block < block_end; ++block)
{
// Note that requests can't cross over a piece boundary.
// So if a piece isn't evenly divisible by the block size,
@ -483,7 +494,8 @@ public:
}
}
tr_peerMgrClientSentRequests(&tor_, this, *span);
active_requests.set_span(block_begin, block_end);
publish(tr_peer_event::SentRequest(tor_.block_info(), *span));
}
}
@ -568,7 +580,9 @@ private:
desired_request_count_ = max_available_reqs();
}
void update_block_requests();
void maybe_send_block_requests();
void check_request_timeout(time_t now);
[[nodiscard]] constexpr auto client_reqq() const noexcept
{
@ -591,7 +605,7 @@ private:
return next;
}
void update_metadata_requests(time_t now) const;
void maybe_send_metadata_requests(time_t now) const;
[[nodiscard]] size_t add_next_metadata_piece();
[[nodiscard]] size_t add_next_block(time_t now_sec, uint64_t now_msec);
[[nodiscard]] size_t fill_output_buffer(time_t now_sec, uint64_t now_msec);
@ -700,6 +714,8 @@ private:
time_t choke_changed_at_ = 0;
time_t request_timeout_base_ = {};
tr_incoming incoming_ = {};
// if the peer supports the Extension Protocol in BEP 10 and
@ -715,6 +731,9 @@ private:
// seconds between periodic send_ut_pex() calls
static auto constexpr SendPexInterval = 90s;
// how many seconds we expect the next piece block to arrive
static auto constexpr RequestTimeoutSecs = time_t{ 90 };
};
// ---
@ -1396,6 +1415,7 @@ ReadResult tr_peerMsgsImpl::process_peer_message(uint8_t id, MessageReader& payl
if (!fext)
{
publish(tr_peer_event::GotChoke());
active_requests.set_has_none();
}
update_active(TR_PEER_TO_CLIENT);
@ -1581,7 +1601,11 @@ ReadResult tr_peerMsgsImpl::process_peer_message(uint8_t id, MessageReader& payl
if (fext)
{
publish(tr_peer_event::GotRejected(tor_.block_info(), tor_.piece_loc(r.index, r.offset).block));
if (auto const block = tor_.piece_loc(r.index, r.offset).block; active_requests.test(block))
{
active_requests.unset(block);
publish(tr_peer_event::GotRejected(tor_.block_info(), block));
}
}
else
{
@ -1620,13 +1644,19 @@ ReadResult tr_peerMsgsImpl::read_piece_data(MessageReader& payload)
if (loc.block_offset + len > block_size)
{
logwarn(this, fmt::format("got unaligned piece {:d}:{:d}->{:d}", piece, offset, len));
logwarn(this, fmt::format("got unaligned block {:d} ({:d}:{:d}->{:d})", block, piece, offset, len));
return { ReadState::Err, len };
}
if (!tr_peerMgrDidPeerRequest(&tor_, this, block))
if (!active_requests.test(block))
{
logwarn(this, fmt::format("got unrequested piece {:d}:{:d}->{:d}", piece, offset, len));
logwarn(this, fmt::format("got unrequested block {:d} ({:d}:{:d}->{:d})", block, piece, offset, len));
return { ReadState::Err, len };
}
if (tor_.has_block(block))
{
logtrace(this, fmt::format("got completed block {:d} ({:d}:{:d}->{:d})", block, piece, offset, len));
return { ReadState::Err, len };
}
@ -1664,35 +1694,15 @@ ReadResult tr_peerMsgsImpl::read_piece_data(MessageReader& payload)
// returns 0 on success, or an errno on failure
int tr_peerMsgsImpl::client_got_block(std::unique_ptr<Cache::BlockData> block_data, tr_block_index_t const block)
{
auto const n_expected = tor_.block_size(block);
if (!block_data)
if (auto const n_bytes = block_data ? std::size(*block_data) : 0U; n_bytes != tor_.block_size(block))
{
logdbg(this, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, 0));
return EMSGSIZE;
}
if (std::size(*block_data) != tor_.block_size(block))
{
logdbg(this, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, std::size(*block_data)));
auto const n_expected = tor_.block_size(block);
logdbg(this, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, n_bytes));
return EMSGSIZE;
}
logtrace(this, fmt::format("got block {:d}", block));
if (!tr_peerMgrDidPeerRequest(&tor_, this, block))
{
logdbg(this, "we didn't ask for this message...");
return 0;
}
auto const loc = tor_.block_loc(block);
if (tor_.has_piece(loc.piece))
{
logtrace(this, "we did ask for this message, but the piece is already complete...");
return 0;
}
// NB: if writeBlock() fails the torrent may be paused.
// If this happens, this object will be destructed and must no longer be used.
if (auto const err = session->cache->write_block(tor_.id(), block, std::move(block_data)); err != 0)
@ -1700,6 +1710,8 @@ int tr_peerMsgsImpl::client_got_block(std::unique_ptr<Cache::BlockData> block_da
return err;
}
active_requests.unset(block);
request_timeout_base_ = tr_time();
publish(tr_peer_event::GotBlock(tor_.block_info(), block));
return 0;
@ -1716,8 +1728,6 @@ void tr_peerMsgsImpl::did_write(tr_peerIo* /*io*/, size_t bytes_written, bool wa
msgs->peer_info->set_latest_piece_data_time(tr_time());
msgs->publish(tr_peer_event::SentPieceData(bytes_written));
}
msgs->pulse();
}
ReadState tr_peerMsgsImpl::can_read(tr_peerIo* io, void* vmsgs, size_t* piece)
@ -1814,9 +1824,10 @@ void tr_peerMsgsImpl::pulse()
auto const now_sec = tr_time();
auto const now_msec = tr_time_msec();
check_request_timeout(now_sec);
update_desired_request_count();
update_block_requests();
update_metadata_requests(now_sec);
maybe_send_block_requests();
maybe_send_metadata_requests(now_sec);
for (;;)
{
@ -1827,7 +1838,7 @@ void tr_peerMsgsImpl::pulse()
}
}
void tr_peerMsgsImpl::update_metadata_requests(time_t now) const
void tr_peerMsgsImpl::maybe_send_metadata_requests(time_t now) const
{
if (!peer_supports_metadata_xfer_)
{
@ -1844,14 +1855,14 @@ void tr_peerMsgsImpl::update_metadata_requests(time_t now) const
}
}
void tr_peerMsgsImpl::update_block_requests()
void tr_peerMsgsImpl::maybe_send_block_requests()
{
if (!tor_.client_can_download())
{
return;
}
auto const n_active = tr_peerMgrCountActiveRequestsToPeer(&tor_, this);
auto const n_active = active_req_count(TR_CLIENT_TO_PEER);
if (n_active >= desired_request_count_)
{
return;
@ -1867,6 +1878,23 @@ void tr_peerMsgsImpl::update_block_requests()
}
}
void tr_peerMsgsImpl::check_request_timeout(time_t now)
{
if (active_requests.has_none() || now - request_timeout_base_ <= RequestTimeoutSecs)
{
return;
}
// If we didn't receive any piece data from this peer for a while,
// cancel all active requests so that we will send a new batch.
// If the peer still doesn't send anything to us, then it will
// naturally get weeded out by the peer mgr.
for (size_t block = 0; block < std::size(active_requests); ++block)
{
maybe_cancel_block_request(block);
}
}
[[nodiscard]] size_t tr_peerMsgsImpl::fill_output_buffer(time_t now_sec, uint64_t now_msec)
{
auto n_bytes_written = size_t{};

View file

@ -342,6 +342,11 @@ struct tr_torrent
return completion_.has_block(block);
}
[[nodiscard]] auto has_blocks(tr_block_span_t span) const
{
return completion_.has_blocks(span);
}
[[nodiscard]] auto count_missing_blocks_in_piece(tr_piece_index_t piece) const
{
return completion_.count_missing_blocks_in_piece(piece);

View file

@ -220,11 +220,7 @@ public:
{
if (dir == TR_CLIENT_TO_PEER) // blocks we've requested
{
return std::accumulate(
std::begin(tasks),
std::end(tasks),
size_t{},
[](size_t sum, auto const* task) { return sum + (task->blocks.end - task->blocks.begin); });
return active_requests.count();
}
// webseed will never request blocks from us
@ -268,12 +264,16 @@ public:
connection_limiter.got_data();
}
void publish_rejection(tr_block_span_t block_span)
void on_rejection(tr_block_span_t block_span)
{
for (auto block = block_span.begin; block < block_span.end; ++block)
{
publish(tr_peer_event::GotRejected(tor.block_info(), block));
if (active_requests.test(block))
{
publish(tr_peer_event::GotRejected(tor.block_info(), block));
}
}
active_requests.unset_span(block_span.begin, block_span.end);
}
void request_blocks(tr_block_span_t const* block_spans, size_t n_spans) override
@ -289,7 +289,8 @@ public:
tasks.insert(task);
task->request_next_chunk();
tr_peerMgrClientSentRequests(&tor, this, *span);
active_requests.set_span(span->begin, span->end);
publish(tr_peer_event::SentRequest(tor.block_info(), *span));
}
}
@ -396,6 +397,7 @@ void tr_webseed_task::use_fetched_blocks()
auto data = std::unique_ptr<Cache::BlockData>{ block_buf };
if (auto const* const torrent = tr_torrentFindFromId(session, tor_id); torrent != nullptr)
{
webseed->active_requests.unset(block);
session->cache->write_block(tor_id, block, std::move(data));
webseed->publish(tr_peer_event::GotBlock(torrent->block_info(), block));
}
@ -442,7 +444,7 @@ void tr_webseed_task::on_partial_data_fetched(tr_web::FetchResponse const& web_r
if (!success)
{
webseed->publish_rejection({ task->loc_.block, task->blocks.end });
webseed->on_rejection({ task->loc_.block, task->blocks.end });
webseed->tasks.erase(task);
delete task;
return;

View file

@ -31,7 +31,6 @@ target_sources(libtransmission-test
move-test.cc
net-test.cc
open-files-test.cc
peer-mgr-active-requests-test.cc
peer-mgr-wishlist-test.cc
peer-msgs-test.cc
platform-test.cc

View file

@ -1,191 +0,0 @@
// This file Copyright (C) 2021-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#define LIBTRANSMISSION_PEER_MODULE
#include <algorithm>
#include <ctime> // time_t
#include <vector>
#include <libtransmission/transmission.h> // tr_block_index_t
#include <libtransmission/peer-mgr-active-requests.h>
#include "gtest/gtest.h"
struct tr_peer;
class PeerMgrActiveRequestsTest : public ::testing::Test
{
protected:
tr_peer* peer_a_ = reinterpret_cast<tr_peer*>(0xCAFE);
tr_peer* peer_b_ = reinterpret_cast<tr_peer*>(0xDEAD);
tr_peer* peer_c_ = reinterpret_cast<tr_peer*>(0xBEEF);
};
// consider: making it a templated class so that tr_peer can be replaced with X
TEST_F(PeerMgrActiveRequestsTest, requestsAreNotAddedTwice)
{
auto requests = ActiveRequests{};
auto const block = tr_block_index_t{ 100 };
auto const peer = static_cast<tr_peer*>(nullptr);
auto const when = time_t{};
EXPECT_TRUE(requests.add(block, peer, when));
EXPECT_FALSE(requests.add(block, peer, when));
EXPECT_FALSE(requests.add(block, peer, when));
EXPECT_FALSE(requests.add(block, peer, when + 1));
}
TEST_F(PeerMgrActiveRequestsTest, requestsMadeAreCounted)
{
auto requests = ActiveRequests{};
auto const block = tr_block_index_t{ 100 };
auto const peer = static_cast<tr_peer*>(nullptr);
auto const when = time_t{};
EXPECT_EQ(0U, requests.count(block));
EXPECT_EQ(0U, requests.count(peer));
EXPECT_EQ(0U, requests.size());
EXPECT_TRUE(requests.add(block, peer, when));
EXPECT_EQ(1U, requests.count(block));
EXPECT_EQ(1U, requests.count(peer));
EXPECT_EQ(1U, requests.size());
}
TEST_F(PeerMgrActiveRequestsTest, requestsAreRemoved)
{
auto requests = ActiveRequests{};
auto const block = tr_block_index_t{ 100 };
auto const peer = static_cast<tr_peer*>(nullptr);
auto const when = time_t{};
EXPECT_TRUE(requests.add(block, peer, when));
EXPECT_EQ(1U, requests.count(block));
EXPECT_EQ(1U, requests.count(peer));
EXPECT_EQ(1U, requests.size());
EXPECT_TRUE(requests.remove(block, peer));
EXPECT_EQ(0U, requests.count(block));
EXPECT_EQ(0U, requests.count(peer));
EXPECT_EQ(0U, requests.size());
EXPECT_FALSE(requests.remove(block, peer));
EXPECT_EQ(0U, requests.count(block));
EXPECT_EQ(0U, requests.count(peer));
EXPECT_EQ(0U, requests.size());
}
TEST_F(PeerMgrActiveRequestsTest, peersAreRemoved)
{
auto requests = ActiveRequests{};
auto const block = tr_block_index_t{ 100 };
auto const peer = static_cast<tr_peer*>(nullptr);
auto const when = time_t{};
// setup: add a request
EXPECT_TRUE(requests.add(block, peer, when));
EXPECT_EQ(1U, requests.count(block));
EXPECT_EQ(1U, requests.count(peer));
EXPECT_EQ(1U, requests.size());
// try removing requests for that block (should remove the 1 active request)
auto const removed = requests.remove(block);
EXPECT_EQ(std::vector<tr_peer*>{ peer }, removed);
EXPECT_EQ(0U, requests.count(block));
EXPECT_EQ(0U, requests.count(peer));
EXPECT_EQ(0U, requests.size());
// try removing requests for that block again (should remove nothing)
EXPECT_EQ(std::vector<tr_peer*>{}, requests.remove(block));
}
TEST_F(PeerMgrActiveRequestsTest, multiplePeersAreRemoved)
{
// setup
auto requests = ActiveRequests{};
auto const block_a = tr_block_index_t{ 128 };
auto const when_a = 100;
EXPECT_TRUE(requests.add(block_a, peer_a_, when_a));
auto const block_b = block_a;
auto const when_b = 200;
EXPECT_TRUE(requests.add(block_b, peer_b_, when_b));
auto const block_c = tr_block_index_t{ 256 };
auto const when_c = when_b;
EXPECT_TRUE(requests.add(block_c, peer_c_, when_c));
EXPECT_EQ(block_a, block_b);
EXPECT_EQ(2U, requests.count(block_a));
EXPECT_EQ(1U, requests.count(block_c));
EXPECT_EQ(3U, requests.size());
// now remove block_a, which was req'd by peer_a_ and peer_b_
auto expected = std::vector<tr_peer*>{ peer_a_, peer_b_ };
std::sort(std::begin(expected), std::end(expected));
auto removed = requests.remove(block_a);
std::sort(std::begin(removed), std::end(removed));
EXPECT_EQ(expected, removed);
}
TEST_F(PeerMgrActiveRequestsTest, multipleBlocksAreRemoved)
{
// setup
auto requests = ActiveRequests{};
auto const block_a1 = tr_block_index_t{ 128 };
auto const when_a1 = 300;
EXPECT_TRUE(requests.add(block_a1, peer_a_, when_a1));
auto const block_a2 = tr_block_index_t{ 256 };
auto const when_a2 = 400;
EXPECT_TRUE(requests.add(block_a2, peer_a_, when_a2));
EXPECT_EQ(2U, requests.size());
EXPECT_EQ(2U, requests.count(peer_a_));
EXPECT_EQ(1U, requests.count(block_a1));
EXPECT_EQ(0U, requests.count(peer_b_));
EXPECT_EQ(0U, requests.count(tr_block_index_t{ 512 }));
// confirm that removing peer_a_ removes all of its requests
auto expected = std::vector<tr_block_index_t>{ block_a1, block_a2 };
std::sort(std::begin(expected), std::end(expected));
auto removed = requests.remove(peer_a_);
std::sort(std::begin(removed), std::end(removed));
EXPECT_EQ(expected, removed);
EXPECT_EQ(0U, requests.size());
EXPECT_EQ(0U, requests.count(peer_a_));
EXPECT_EQ(0U, requests.count(block_a1));
}
TEST_F(PeerMgrActiveRequestsTest, sentBefore)
{
// setup
auto requests = ActiveRequests{};
auto const block_a1 = tr_block_index_t{ 128 };
auto const when_a1 = 300;
EXPECT_TRUE(requests.add(block_a1, peer_a_, when_a1));
auto const block_a2 = tr_block_index_t{ 256 };
auto const when_a2 = 400;
EXPECT_TRUE(requests.add(block_a2, peer_a_, when_a2));
EXPECT_EQ(2U, requests.size());
EXPECT_EQ(2U, requests.count(peer_a_));
EXPECT_EQ(1U, requests.count(block_a1));
// test that the timestamps are counted correctly
EXPECT_EQ(0U, std::size(requests.sentBefore(when_a1 - 1)));
EXPECT_EQ(0U, std::size(requests.sentBefore(when_a1)));
EXPECT_EQ(1U, std::size(requests.sentBefore(when_a1 + 1)));
EXPECT_EQ(1U, std::size(requests.sentBefore(when_a2 - 1)));
EXPECT_EQ(1U, std::size(requests.sentBefore(when_a2)));
EXPECT_EQ(2U, std::size(requests.sentBefore(when_a2 + 1)));
// test that the returned block + peer pairs are correct
auto items = requests.sentBefore(when_a1 + 1);
ASSERT_EQ(1U, std::size(items));
EXPECT_EQ(block_a1, items[0].first);
EXPECT_EQ(peer_a_, items[0].second);
}

File diff suppressed because it is too large Load diff