mirror of
https://github.com/transmission/transmission
synced 2025-03-15 16:29:34 +00:00
perf: use libsmall in libtransmission, pt 3 (#5653)
* refactor: use BufferReader, BufferWriter in peer-socket * feat: expose GrowthFactor in tr-buffer * perf: choose better defaults for the peer message buffers * chore: sync tests * refactor: use small::map in ActiveRequests::Impl
This commit is contained in:
parent
38ea020eca
commit
3b03494580
9 changed files with 58 additions and 86 deletions
|
@ -419,7 +419,7 @@ size_t tr_peerIo::try_read(size_t max)
|
|||
|
||||
auto& buf = inbuf_;
|
||||
tr_error* error = nullptr;
|
||||
auto const n_read = socket_.try_read(buf, max, &error);
|
||||
auto const n_read = socket_.try_read(buf, max, std::empty(buf), &error);
|
||||
set_enabled(Dir, error == nullptr || canRetryFromError(error->code));
|
||||
|
||||
if (error != nullptr)
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "transmission.h"
|
||||
|
||||
#include "bandwidth.h"
|
||||
#include "block-info.h"
|
||||
#include "net.h" // tr_address
|
||||
#include "peer-mse.h"
|
||||
#include "peer-socket.h"
|
||||
|
@ -291,8 +292,17 @@ public:
|
|||
static void utp_init(struct_utp_context* ctx);
|
||||
|
||||
private:
|
||||
// size of the buffer we use to hold incoming & outgoing messages
|
||||
static constexpr auto InitialBufferSize = tr_block_info::BlockSize + sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t);
|
||||
|
||||
// our target socket receive buffer size
|
||||
static constexpr auto RcvBuf = size_t{ 256 * 1024 };
|
||||
|
||||
// start with a buffer size large enough to hold a BT block message,
|
||||
// but avoid repeated reallocs by scaling up very quickly (5X) when
|
||||
// we need more capacity.
|
||||
using Buffer = libtransmission::SmallBuffer<InitialBufferSize, std::byte, std::ratio<5, 1>>;
|
||||
|
||||
friend class libtransmission::test::HandshakeTest;
|
||||
|
||||
[[nodiscard]] constexpr auto is_seed() const noexcept
|
||||
|
@ -344,8 +354,8 @@ private:
|
|||
|
||||
tr_sha1_digest_t info_hash_;
|
||||
|
||||
libtransmission::Buffer inbuf_;
|
||||
libtransmission::Buffer outbuf_;
|
||||
Buffer inbuf_;
|
||||
Buffer outbuf_;
|
||||
|
||||
tr_session* const session_;
|
||||
|
||||
|
|
|
@ -11,53 +11,16 @@
|
|||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include <small/map.hpp>
|
||||
|
||||
#define LIBTRANSMISSION_PEER_MODULE
|
||||
|
||||
#include "libtransmission/transmission.h"
|
||||
|
||||
#include "libtransmission/peer-mgr-active-requests.h"
|
||||
#include "libtransmission/peer-mgr-wishlist.h"
|
||||
#include "libtransmission/tr-assert.h"
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
struct peer_at
|
||||
{
|
||||
tr_peer* peer;
|
||||
time_t when;
|
||||
|
||||
peer_at(tr_peer* p, time_t w)
|
||||
: peer{ p }
|
||||
, when{ w }
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] int compare(peer_at const& that) const // <=>
|
||||
{
|
||||
if (peer != that.peer)
|
||||
{
|
||||
return peer < that.peer ? -1 : 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool operator==(peer_at const& that) const
|
||||
{
|
||||
return compare(that) == 0;
|
||||
}
|
||||
};
|
||||
|
||||
struct PeerAtHash
|
||||
{
|
||||
std::size_t operator()(peer_at const& pa) const noexcept
|
||||
{
|
||||
return std::hash<tr_peer*>{}(pa.peer);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
class ActiveRequests::Impl
|
||||
{
|
||||
public:
|
||||
|
@ -97,7 +60,7 @@ public:
|
|||
|
||||
std::unordered_map<tr_peer const*, size_t> count_;
|
||||
|
||||
std::unordered_map<tr_block_index_t, std::unordered_set<peer_at, PeerAtHash>> blocks_;
|
||||
std::unordered_map<tr_block_index_t, small::map<tr_peer const*, time_t, Wishlist::EndgameMaxPeers>> blocks_;
|
||||
|
||||
private:
|
||||
size_t size_ = 0;
|
||||
|
@ -126,8 +89,7 @@ bool ActiveRequests::add(tr_block_index_t block, tr_peer* peer, time_t when)
|
|||
bool ActiveRequests::remove(tr_block_index_t block, tr_peer const* peer)
|
||||
{
|
||||
auto const it = impl_->blocks_.find(block);
|
||||
auto const key = peer_at{ const_cast<tr_peer*>(peer), 0 };
|
||||
auto const removed = it != std::end(impl_->blocks_) && it->second.erase(key) != 0;
|
||||
auto const removed = it != std::end(impl_->blocks_) && it->second.erase(peer) != 0;
|
||||
|
||||
if (removed)
|
||||
{
|
||||
|
@ -148,10 +110,9 @@ std::vector<tr_block_index_t> ActiveRequests::remove(tr_peer const* peer)
|
|||
auto removed = std::vector<tr_block_index_t>{};
|
||||
removed.reserve(impl_->blocks_.size());
|
||||
|
||||
auto const key = peer_at{ const_cast<tr_peer*>(peer), 0 };
|
||||
for (auto const& [block, peers_at] : impl_->blocks_)
|
||||
{
|
||||
if (peers_at.count(key) != 0U)
|
||||
if (peers_at.count(peer) != 0U)
|
||||
{
|
||||
removed.push_back(block);
|
||||
}
|
||||
|
@ -178,7 +139,7 @@ std::vector<tr_peer*> ActiveRequests::remove(tr_block_index_t block)
|
|||
std::begin(it->second),
|
||||
std::end(it->second),
|
||||
std::begin(removed),
|
||||
[](auto const& sent) { return sent.peer; });
|
||||
[](auto const& iter) { return const_cast<tr_peer*>(iter.first); });
|
||||
impl_->blocks_.erase(block);
|
||||
}
|
||||
|
||||
|
@ -194,7 +155,7 @@ std::vector<tr_peer*> ActiveRequests::remove(tr_block_index_t block)
|
|||
bool ActiveRequests::has(tr_block_index_t block, tr_peer const* peer) const
|
||||
{
|
||||
auto const it = impl_->blocks_.find(block);
|
||||
return it != std::end(impl_->blocks_) && (it->second.count(peer_at{ const_cast<tr_peer*>(peer), 0 }) != 0U);
|
||||
return it != std::end(impl_->blocks_) && (it->second.count(peer) != 0U);
|
||||
}
|
||||
|
||||
// count how many peers we're asking for `block`
|
||||
|
@ -225,11 +186,11 @@ std::vector<std::pair<tr_block_index_t, tr_peer*>> ActiveRequests::sentBefore(ti
|
|||
|
||||
for (auto const& [block, peers_at] : impl_->blocks_)
|
||||
{
|
||||
for (auto const& sent : peers_at)
|
||||
for (auto const& [peer, sent_at] : peers_at)
|
||||
{
|
||||
if (sent.when < when)
|
||||
if (sent_at < when)
|
||||
{
|
||||
sent_before.emplace_back(block, sent.peer);
|
||||
sent_before.emplace_back(block, const_cast<tr_peer*>(peer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
class Wishlist
|
||||
{
|
||||
public:
|
||||
static auto constexpr EndgameMaxPeers = size_t{ 2U };
|
||||
|
||||
struct Mediator
|
||||
{
|
||||
[[nodiscard]] virtual bool clientCanRequestBlock(tr_block_index_t block) const = 0;
|
||||
|
|
|
@ -70,7 +70,7 @@ void tr_peer_socket::close()
|
|||
handle = {};
|
||||
}
|
||||
|
||||
size_t tr_peer_socket::try_write(Buffer& buf, size_t max, tr_error** error) const
|
||||
size_t tr_peer_socket::try_write(OutBuf& buf, size_t max, tr_error** error) const
|
||||
{
|
||||
if (max == size_t{})
|
||||
{
|
||||
|
@ -107,7 +107,7 @@ size_t tr_peer_socket::try_write(Buffer& buf, size_t max, tr_error** error) cons
|
|||
return {};
|
||||
}
|
||||
|
||||
size_t tr_peer_socket::try_read(Buffer& buf, size_t max, tr_error** error) const
|
||||
size_t tr_peer_socket::try_read(InBuf& buf, size_t max, [[maybe_unused]] bool is_buf_empty, tr_error** error) const
|
||||
{
|
||||
if (max == size_t{})
|
||||
{
|
||||
|
@ -123,7 +123,7 @@ size_t tr_peer_socket::try_read(Buffer& buf, size_t max, tr_error** error) const
|
|||
// utp_read_drained() notifies libutp that this read buffer is empty.
|
||||
// It opens up the congestion window by sending an ACK (soonish) if
|
||||
// one was not going to be sent.
|
||||
if (is_utp() && std::empty(buf))
|
||||
if (is_utp() && is_buf_empty)
|
||||
{
|
||||
utp_read_drained(handle.utp);
|
||||
}
|
||||
|
|
|
@ -27,8 +27,6 @@ struct tr_session;
|
|||
class tr_peer_socket
|
||||
{
|
||||
public:
|
||||
using Buffer = libtransmission::Buffer;
|
||||
|
||||
tr_peer_socket() = default;
|
||||
tr_peer_socket(tr_session const* session, tr_address const& address, tr_port port, tr_socket_t sock);
|
||||
tr_peer_socket(tr_address const& address, tr_port port, struct UTPSocket* const sock);
|
||||
|
@ -56,8 +54,11 @@ public:
|
|||
}
|
||||
void close();
|
||||
|
||||
size_t try_write(Buffer& buf, size_t max, tr_error** error) const;
|
||||
size_t try_read(Buffer& buf, size_t max, tr_error** error) const;
|
||||
using InBuf = libtransmission::BufferWriter<std::byte>;
|
||||
using OutBuf = libtransmission::BufferReader<std::byte>;
|
||||
|
||||
size_t try_read(InBuf& buf, size_t max, bool is_buf_empty, tr_error** error) const;
|
||||
size_t try_write(OutBuf& buf, size_t max, tr_error** error) const;
|
||||
|
||||
[[nodiscard]] constexpr std::pair<tr_address, tr_port> socketAddress() const noexcept
|
||||
{
|
||||
|
|
|
@ -233,7 +233,7 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
template<size_t N, typename value_type = std::byte>
|
||||
template<size_t N, typename value_type = std::byte, typename GrowthFactor = std::ratio<3, 2>>
|
||||
class SmallBuffer final
|
||||
: public BufferReader<value_type>
|
||||
, public BufferWriter<value_type>
|
||||
|
@ -288,11 +288,9 @@ public:
|
|||
}
|
||||
|
||||
private:
|
||||
small::vector<value_type, N> buf_ = {};
|
||||
small::vector<value_type, N, std::allocator<value_type>, std::true_type, size_t, GrowthFactor> buf_ = {};
|
||||
size_t begin_pos_ = {};
|
||||
size_t end_pos_ = {};
|
||||
};
|
||||
|
||||
using Buffer = SmallBuffer<0, std::byte>;
|
||||
|
||||
} // namespace libtransmission
|
||||
|
|
|
@ -25,6 +25,8 @@
|
|||
|
||||
using namespace std::literals;
|
||||
|
||||
using Buffer = libtransmission::SmallBuffer<1024>;
|
||||
|
||||
class AnnouncerUdpTest : public ::testing::Test
|
||||
{
|
||||
private:
|
||||
|
@ -111,7 +113,7 @@ protected:
|
|||
}
|
||||
}
|
||||
|
||||
[[nodiscard]] static uint32_t parseConnectionRequest(libtransmission::Buffer& buf)
|
||||
[[nodiscard]] static uint32_t parseConnectionRequest(Buffer& buf)
|
||||
{
|
||||
EXPECT_EQ(ProtocolId, buf.to_uint64());
|
||||
EXPECT_EQ(ConnectAction, buf.to_uint32());
|
||||
|
@ -147,7 +149,7 @@ protected:
|
|||
return std::make_pair(buildScrapeRequestFromResponse(response), response);
|
||||
}
|
||||
|
||||
[[nodiscard]] static auto parseScrapeRequest(libtransmission::Buffer& buf, uint64_t expected_connection_id)
|
||||
[[nodiscard]] static auto parseScrapeRequest(Buffer& buf, uint64_t expected_connection_id)
|
||||
{
|
||||
EXPECT_EQ(expected_connection_id, buf.to_uint64());
|
||||
EXPECT_EQ(ScrapeAction, buf.to_uint32());
|
||||
|
@ -162,7 +164,7 @@ protected:
|
|||
return std::make_pair(transaction_id, info_hashes);
|
||||
}
|
||||
|
||||
static void waitForAnnouncerToSendMessage(MockMediator& mediator, libtransmission::Buffer& setme)
|
||||
static void waitForAnnouncerToSendMessage(MockMediator& mediator, Buffer& setme)
|
||||
{
|
||||
libtransmission::test::waitFor(mediator.eventBase(), [&mediator]() { return !std::empty(mediator.sent_); });
|
||||
setme.clear();
|
||||
|
@ -172,7 +174,7 @@ protected:
|
|||
|
||||
[[nodiscard]] static bool sendError(tr_announcer_udp& announcer, uint32_t transaction_id, std::string_view errmsg)
|
||||
{
|
||||
auto buf = libtransmission::Buffer{};
|
||||
auto buf = Buffer{};
|
||||
buf.add_uint32(ErrorAction);
|
||||
buf.add_uint32(transaction_id);
|
||||
buf.add(errmsg);
|
||||
|
@ -187,7 +189,7 @@ protected:
|
|||
[[nodiscard]] static auto sendConnectionResponse(tr_announcer_udp& announcer, uint32_t transaction_id)
|
||||
{
|
||||
auto const connection_id = tr_rand_obj<uint64_t>();
|
||||
auto buf = libtransmission::Buffer{};
|
||||
auto buf = Buffer{};
|
||||
buf.add_uint32(ConnectAction);
|
||||
buf.add_uint32(transaction_id);
|
||||
buf.add_uint64(connection_id);
|
||||
|
@ -250,7 +252,7 @@ protected:
|
|||
EXPECT_EQ(actual.external_ip, expected.external_ip);
|
||||
}
|
||||
|
||||
[[nodiscard]] static auto parseAnnounceRequest(libtransmission::Buffer& buf, uint64_t connection_id)
|
||||
[[nodiscard]] static auto parseAnnounceRequest(Buffer& buf, uint64_t connection_id)
|
||||
{
|
||||
auto req = UdpAnnounceReq{};
|
||||
req.connection_id = buf.to_uint64();
|
||||
|
@ -313,7 +315,7 @@ TEST_F(AnnouncerUdpTest, canScrape)
|
|||
|
||||
// The announcer should have sent a UDP connection request.
|
||||
// Inspect that request for validity.
|
||||
auto sent = libtransmission::Buffer{};
|
||||
auto sent = Buffer{};
|
||||
waitForAnnouncerToSendMessage(mediator, sent);
|
||||
auto connect_transaction_id = parseConnectionRequest(sent);
|
||||
|
||||
|
@ -327,7 +329,7 @@ TEST_F(AnnouncerUdpTest, canScrape)
|
|||
expectEqual(request, info_hashes);
|
||||
|
||||
// Have the tracker respond to the request
|
||||
auto buf = libtransmission::Buffer{};
|
||||
auto buf = Buffer{};
|
||||
buf.add_uint32(ScrapeAction);
|
||||
buf.add_uint32(scrape_transaction_id);
|
||||
buf.add_uint32(expected_response.rows[0].seeders);
|
||||
|
@ -369,7 +371,7 @@ TEST_F(AnnouncerUdpTest, canDestructCleanlyEvenWhenBusy)
|
|||
|
||||
// The announcer should have sent a UDP connection request.
|
||||
// Inspect that request for validity.
|
||||
auto sent = libtransmission::Buffer{};
|
||||
auto sent = Buffer{};
|
||||
waitForAnnouncerToSendMessage(mediator, sent);
|
||||
auto const connect_transaction_id = parseConnectionRequest(sent);
|
||||
EXPECT_NE(0U, connect_transaction_id);
|
||||
|
@ -398,7 +400,7 @@ TEST_F(AnnouncerUdpTest, canMultiScrape)
|
|||
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
|
||||
|
||||
// Announcer will request a connection. Verify and grant the request
|
||||
auto sent = libtransmission::Buffer{};
|
||||
auto sent = Buffer{};
|
||||
waitForAnnouncerToSendMessage(mediator, sent);
|
||||
auto connect_transaction_id = parseConnectionRequest(sent);
|
||||
auto const connection_id = sendConnectionResponse(*announcer, connect_transaction_id);
|
||||
|
@ -410,7 +412,7 @@ TEST_F(AnnouncerUdpTest, canMultiScrape)
|
|||
expectEqual(request, info_hashes);
|
||||
|
||||
// Have the tracker respond to the request
|
||||
auto buf = libtransmission::Buffer{};
|
||||
auto buf = Buffer{};
|
||||
buf.add_uint32(ScrapeAction);
|
||||
buf.add_uint32(scrape_transaction_id);
|
||||
for (int i = 0; i < expected_response.row_count; ++i)
|
||||
|
@ -460,7 +462,7 @@ TEST_F(AnnouncerUdpTest, canHandleScrapeError)
|
|||
|
||||
// The announcer should have sent a UDP connection request.
|
||||
// Inspect that request for validity.
|
||||
auto sent = libtransmission::Buffer{};
|
||||
auto sent = Buffer{};
|
||||
waitForAnnouncerToSendMessage(mediator, sent);
|
||||
auto connect_transaction_id = parseConnectionRequest(sent);
|
||||
|
||||
|
@ -510,7 +512,7 @@ TEST_F(AnnouncerUdpTest, canHandleConnectError)
|
|||
|
||||
// The announcer should have sent a UDP connection request.
|
||||
// Inspect that request for validity.
|
||||
auto sent = libtransmission::Buffer{};
|
||||
auto sent = Buffer{};
|
||||
waitForAnnouncerToSendMessage(mediator, sent);
|
||||
auto transaction_id = parseConnectionRequest(sent);
|
||||
|
||||
|
@ -542,12 +544,12 @@ TEST_F(AnnouncerUdpTest, handleMessageReturnsFalseOnInvalidMessage)
|
|||
|
||||
// The announcer should have sent a UDP connection request.
|
||||
// Inspect that request for validity.
|
||||
auto sent = libtransmission::Buffer{};
|
||||
auto sent = Buffer{};
|
||||
waitForAnnouncerToSendMessage(mediator, sent);
|
||||
auto transaction_id = parseConnectionRequest(sent);
|
||||
|
||||
// send a connection response but with an *invalid* transaction id
|
||||
auto buf = libtransmission::Buffer{};
|
||||
auto buf = Buffer{};
|
||||
buf.add_uint32(ConnectAction);
|
||||
buf.add_uint32(transaction_id + 1);
|
||||
buf.add_uint64(tr_rand_obj<uint64_t>());
|
||||
|
@ -623,7 +625,7 @@ TEST_F(AnnouncerUdpTest, canAnnounce)
|
|||
announcer->announce(request, [&response](tr_announce_response const& resp) { response = resp; });
|
||||
|
||||
// Announcer will request a connection. Verify and grant the request
|
||||
auto sent = libtransmission::Buffer{};
|
||||
auto sent = Buffer{};
|
||||
waitForAnnouncerToSendMessage(mediator, sent);
|
||||
auto connect_transaction_id = parseConnectionRequest(sent);
|
||||
auto const connection_id = sendConnectionResponse(*announcer, connect_transaction_id);
|
||||
|
@ -635,7 +637,7 @@ TEST_F(AnnouncerUdpTest, canAnnounce)
|
|||
expectEqual(request, udp_ann_req);
|
||||
|
||||
// Have the tracker respond to the request
|
||||
auto buf = libtransmission::Buffer{};
|
||||
auto buf = Buffer{};
|
||||
buf.add_uint32(AnnounceAction);
|
||||
buf.add_uint32(udp_ann_req.transaction_id);
|
||||
buf.add_uint32(expected_response.interval);
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
|
||||
using BufferTest = ::testing::Test;
|
||||
using namespace std::literals;
|
||||
using Buffer = libtransmission::Buffer;
|
||||
using Buffer = libtransmission::SmallBuffer<1024>;
|
||||
|
||||
TEST_F(BufferTest, startsWithInSingleSegment)
|
||||
{
|
||||
|
@ -103,9 +103,7 @@ TEST_F(BufferTest, NonBufferWriter)
|
|||
auto constexpr Bang = "!"sv;
|
||||
|
||||
auto out1 = Buffer{};
|
||||
|
||||
auto out2_vec = std::vector<std::byte>{};
|
||||
auto out2 = libtransmission::BufferWriter<std::vector<std::byte>, std::byte>{ &out2_vec };
|
||||
auto out2 = libtransmission::SmallBuffer<1024>{};
|
||||
|
||||
out1.add_uint8(1);
|
||||
out2.add_uint8(1);
|
||||
|
@ -126,7 +124,7 @@ TEST_F(BufferTest, NonBufferWriter)
|
|||
out2.add(Bang);
|
||||
|
||||
auto const result1 = out1.to_string_view();
|
||||
auto const result2 = std::string_view{ reinterpret_cast<char const*>(std::data(out2_vec)), std::size(out2_vec) };
|
||||
auto const result2 = out2.to_string();
|
||||
EXPECT_EQ(result1, result2);
|
||||
}
|
||||
#endif
|
||||
|
|
Loading…
Add table
Reference in a new issue