perf: remove staging step for outbound peer msgs (#5394)

Write non-piece peer messages directly to the peer's outbuf instead of
waiting for a pulse() message to refill it. This can help with latency
sending messages out.

Change the semantics of `tr_peerIo::get_write_buffer_space()`: this is
now interpreted as the preferred minimum size, rather than the maximum.
It's OK to enqueue an outgoing piece message as long as there's _some_
space left, even if the message is larger than that space.

Build peer messages with template fold expressions. This lets us move
all the message-building to a single function and add some sanity checks
to the outgoing messages.
This commit is contained in:
Charles Kerr 2023-04-14 19:31:09 -05:00 committed by GitHub
parent 9158ae7126
commit e91af26923
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 313 additions and 416 deletions

View File

@ -46,6 +46,7 @@
#endif
using namespace std::literals;
using Buffer = libtransmission::Buffer;
namespace
{
@ -166,9 +167,6 @@ auto constexpr MetadataReqQ = int{ 64 };
auto constexpr ReqQ = int{ 512 };
// used in lowering the outMessages queue period
auto constexpr ImmediatePriorityIntervalSecs = int{ 0 };
auto constexpr HighPriorityIntervalSecs = int{ 2 };
auto constexpr LowPriorityIntervalSecs = int{ 10 };
// how many blocks to keep prefetched per peer
auto constexpr PrefetchMax = size_t{ 18 };
@ -219,7 +217,7 @@ struct tr_incoming
{
std::optional<uint32_t> length; // the full message payload length. Includes the +1 for id length
std::optional<uint8_t> id; // the protocol message, e.g. BtPeerMsgs::Piece
libtransmission::Buffer payload;
Buffer payload;
struct incoming_piece_data
{
@ -243,10 +241,11 @@ void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs);
void didWrite(tr_peerIo* io, size_t bytes_written, bool was_piece_data, void* vmsgs);
void gotError(tr_peerIo* io, tr_error const& err, void* vmsgs);
void peerPulse(void* vmsgs);
void protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req);
void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke);
void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index);
void protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port);
size_t protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req);
size_t protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke);
size_t protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index);
size_t protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port);
size_t protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req);
void sendInterest(tr_peerMsgsImpl* msgs, bool b);
void sendLtepHandshake(tr_peerMsgsImpl* msgs);
void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs);
@ -255,7 +254,7 @@ void updateDesiredRequestCount(tr_peerMsgsImpl* msgs);
#define myLogMacro(msgs, level, text) \
do \
{ \
if (tr_logLevelIsActive(level)) \
if (true || tr_logLevelIsActive(level)) \
{ \
tr_logAddMessage( \
__FILE__, \
@ -297,7 +296,6 @@ public:
tr_peer_callback callback,
void* callback_data)
: tr_peerMsgs{ torrent_in, atom_in, client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() }
, outMessagesBatchPeriod{ LowPriorityIntervalSecs }
, torrent{ torrent_in }
, io{ std::move(io_in) }
, have_{ torrent_in->pieceCount() }
@ -354,20 +352,6 @@ public:
}
}
void dbgOutMessageLen() const
{
logtrace(this, fmt::format(FMT_STRING("outMessage size is now {:d}"), std::size(outMessages)));
}
void pokeBatchPeriod(int interval)
{
if (outMessagesBatchPeriod > interval)
{
outMessagesBatchPeriod = interval;
logtrace(this, fmt::format(FMT_STRING("lowering batch interval to {:d} seconds"), interval));
}
}
bool isTransferringPieces(uint64_t now, tr_direction dir, tr_bytes_per_second_t* setme_bytes_per_second) const override
{
auto const bytes_per_second = io->get_piece_speed_bytes_per_second(now, dir);
@ -515,7 +499,7 @@ public:
auto const left_in_block = block_size - loc.block_offset;
auto const left_in_piece = torrent->pieceSize(loc.piece) - loc.piece_offset;
auto const req_len = std::min(left_in_block, left_in_piece);
protocolSendRequest({ loc.piece, loc.piece_offset, req_len });
protocolSendRequest(this, { loc.piece, loc.piece_offset, req_len });
offset += req_len;
}
}
@ -581,22 +565,6 @@ private:
return max_reqs;
}
void protocolSendRequest(struct peer_request const& req)
{
TR_ASSERT(isValidRequest(req));
auto& out = outMessages;
out.add_uint32(sizeof(uint8_t) + 3 * sizeof(uint32_t));
out.add_uint8(BtPeerMsgs::Request);
out.add_uint32(req.index);
out.add_uint32(req.offset);
out.add_uint32(req.length);
logtrace(this, fmt::format(FMT_STRING("requesting {:d}:{:d}->{:d}..."), req.index, req.offset, req.length));
dbgOutMessageLen();
pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
[[nodiscard]] bool calculate_active(tr_direction direction) const
{
if (direction == TR_CLIENT_TO_PEER)
@ -637,11 +605,6 @@ public:
size_t desired_request_count = 0;
/* how long the outMessages batch should be allowed to grow before
* it's flushed -- some messages (like requests >:) should be sent
* very quickly; others aren't as urgent. */
int8_t outMessagesBatchPeriod;
uint8_t ut_pex_id = 0;
uint8_t ut_metadata_id = 0;
@ -653,8 +616,6 @@ public:
tr_torrent* const torrent;
libtransmission::Buffer outMessages; /* all the non-piece messages */
std::shared_ptr<tr_peerIo> const io;
struct QueuedPeerRequest : public peer_request
@ -679,7 +640,7 @@ public:
time_t chokeChangedAt = 0;
/* when we started batching the outMessages */
time_t outMessagesBatchedAt = 0;
// time_t outMessagesBatchedAt = 0;
struct tr_incoming incoming = {};
@ -704,127 +665,220 @@ private:
// ---
void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const* req)
[[nodiscard]] constexpr bool messageLengthIsCorrect(tr_torrent const* const tor, uint8_t id, uint32_t len)
{
switch (id)
{
case BtPeerMsgs::Choke:
case BtPeerMsgs::Unchoke:
case BtPeerMsgs::Interested:
case BtPeerMsgs::NotInterested:
case BtPeerMsgs::FextHaveAll:
case BtPeerMsgs::FextHaveNone:
return len == 1U;
case BtPeerMsgs::Have:
case BtPeerMsgs::FextSuggest:
case BtPeerMsgs::FextAllowedFast:
return len == 5U;
case BtPeerMsgs::Bitfield:
return !tor->hasMetainfo() || len == 1 + ((tor->pieceCount() + 7U) / 8U);
case BtPeerMsgs::Request:
case BtPeerMsgs::Cancel:
case BtPeerMsgs::FextReject:
return len == 13U;
case BtPeerMsgs::Piece:
len -= sizeof(id) + sizeof(uint32_t /*piece*/) + sizeof(uint32_t /*offset*/);
return len <= tr_block_info::BlockSize;
case BtPeerMsgs::Port:
return len == 3U;
case BtPeerMsgs::Ltep:
return len >= 2U;
default: // unrecognized message
return false;
}
}
namespace protocol_send_message_helpers
{
namespace
{
[[nodiscard]] constexpr auto get_param_length(uint8_t param) noexcept
{
return sizeof(param);
}
[[nodiscard]] constexpr auto get_param_length(uint16_t param) noexcept
{
return sizeof(param);
}
[[nodiscard]] constexpr auto get_param_length(uint32_t param) noexcept
{
return sizeof(param);
}
template<typename T>
[[nodiscard]] TR_CONSTEXPR20 auto get_param_length(T const& param) noexcept
{
return std::size(param);
}
// ---
void add_param(Buffer& buffer, uint8_t param) noexcept
{
buffer.add_uint8(param);
}
void add_param(Buffer& buffer, uint16_t param) noexcept
{
buffer.add_uint16(param);
}
void add_param(Buffer& buffer, uint32_t param) noexcept
{
buffer.add_uint32(param);
}
template<typename T>
void add_param(Buffer& buffer, T const& param) noexcept
{
buffer.add(param);
}
// ---
[[nodiscard]] std::string log_param(uint8_t param)
{
return fmt::format(" {:d}", static_cast<int>(param));
}
[[nodiscard]] std::string log_param(uint16_t param)
{
return fmt::format(" {:d}", static_cast<int>(param));
}
[[nodiscard]] std::string log_param(uint32_t param)
{
return fmt::format(" {:d}", static_cast<int>(param));
}
template<typename T>
[[nodiscard]] std::string log_param(T const& /*unused*/)
{
return " []";
}
template<typename... Args>
[[nodiscard]] std::string build_log_message(uint8_t type, Args const&... args)
{
auto text = fmt::format("sending '{:s}'", BtPeerMsgs::debug_name(type));
(text.append(log_param(args)), ...);
return text;
}
} // namespace
template<typename... Args>
void build_peer_message(tr_peerMsgsImpl const* const msgs, Buffer& out, uint8_t type, Args const&... args)
{
logtrace(msgs, build_log_message(type, args...));
auto const old_len = std::size(out);
auto msg_len = sizeof(type);
((msg_len += get_param_length(args)), ...);
out.reserve(old_len + msg_len);
out.add_uint32(msg_len);
out.add_uint8(type);
(add_param(out, args), ...);
TR_ASSERT(old_len + sizeof(uint32_t) + msg_len);
TR_ASSERT(messageLengthIsCorrect(msgs->torrent, type, msg_len));
}
} // namespace protocol_send_message_helpers
template<typename... Args>
size_t protocol_send_message(tr_peerMsgsImpl const* const msgs, uint8_t type, Args const&... args)
{
using namespace protocol_send_message_helpers;
auto out = Buffer{};
build_peer_message(msgs, out, type, args...);
auto const n_bytes_added = std::size(out);
msgs->io->write(out, type == BtPeerMsgs::Piece);
return n_bytes_added;
}
size_t protocol_send_keepalive(tr_peerMsgsImpl* msgs)
{
logtrace(msgs, "sending 'keepalive'");
auto out = Buffer{};
out.add_uint32(0);
auto const n_bytes_added = std::size(out);
msgs->io->write(out, false);
return n_bytes_added;
}
auto protocolSendReject(tr_peerMsgsImpl* const msgs, struct peer_request const* req)
{
TR_ASSERT(msgs->io->supports_fext());
auto& out = msgs->outMessages;
out.add_uint32(sizeof(uint8_t) + 3 * sizeof(uint32_t));
out.add_uint8(BtPeerMsgs::FextReject);
out.add_uint32(req->index);
out.add_uint32(req->offset);
out.add_uint32(req->length);
logtrace(msgs, fmt::format(FMT_STRING("rejecting {:d}:{:d}->{:d}..."), req->index, req->offset, req->length));
msgs->dbgOutMessageLen();
return protocol_send_message(msgs, BtPeerMsgs::FextReject, req->index, req->offset, req->length);
}
void protocolSendCancel(tr_peerMsgsImpl* msgs, peer_request const& req)
size_t protocolSendCancel(tr_peerMsgsImpl* const msgs, peer_request const& req)
{
auto& out = msgs->outMessages;
out.add_uint32(sizeof(uint8_t) + 3 * sizeof(uint32_t));
out.add_uint8(BtPeerMsgs::Cancel);
out.add_uint32(req.index);
out.add_uint32(req.offset);
out.add_uint32(req.length);
logtrace(msgs, fmt::format(FMT_STRING("cancelling {:d}:{:d}->{:d}..."), req.index, req.offset, req.length));
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
return protocol_send_message(msgs, BtPeerMsgs::Cancel, req.index, req.offset, req.length);
}
void protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port)
size_t protocolSendRequest(tr_peerMsgsImpl* const msgs, struct peer_request const& req)
{
auto& out = msgs->outMessages;
logtrace(msgs, fmt::format(FMT_STRING("sending Port {:d}"), port.host()));
out.add_uint32(3);
out.add_uint8(BtPeerMsgs::Port);
out.add_port(port);
TR_ASSERT(msgs->isValidRequest(req));
return protocol_send_message(msgs, BtPeerMsgs::Request, req.index, req.offset, req.length);
}
void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index)
size_t protocolSendPort(tr_peerMsgsImpl* const msgs, tr_port port)
{
auto& out = msgs->outMessages;
out.add_uint32(sizeof(uint8_t) + sizeof(uint32_t));
out.add_uint8(BtPeerMsgs::Have);
out.add_uint32(index);
logtrace(msgs, fmt::format(FMT_STRING("sending Have {:d}"), index));
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(LowPriorityIntervalSecs);
return protocol_send_message(msgs, BtPeerMsgs::Port, port.host());
}
void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke)
size_t protocolSendHave(tr_peerMsgsImpl* const msgs, tr_piece_index_t index)
{
auto& out = msgs->outMessages;
out.add_uint32(sizeof(uint8_t));
out.add_uint8(choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke);
logtrace(msgs, choke ? "sending choke" : "sending unchoked");
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
return protocol_send_message(msgs, BtPeerMsgs::Have, index);
}
void protocolSendHaveAll(tr_peerMsgsImpl* msgs)
size_t protocolSendChoke(tr_peerMsgsImpl* const msgs, bool choke)
{
TR_ASSERT(msgs->io->supports_fext());
auto& out = msgs->outMessages;
out.add_uint32(sizeof(uint8_t));
out.add_uint8(BtPeerMsgs::FextHaveAll);
logtrace(msgs, "sending HAVE_ALL...");
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
void protocolSendHaveNone(tr_peerMsgsImpl* msgs)
{
TR_ASSERT(msgs->io->supports_fext());
auto& out = msgs->outMessages;
out.add_uint32(sizeof(uint8_t));
out.add_uint8(BtPeerMsgs::FextHaveNone);
logtrace(msgs, "sending HAVE_NONE...");
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
return protocol_send_message(msgs, choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke);
}
// --- INTEREST
void sendInterest(tr_peerMsgsImpl* msgs, bool b)
{
TR_ASSERT(msgs != nullptr);
auto& out = msgs->outMessages;
logtrace(msgs, b ? "Sending Interested" : "Sending Not Interested");
out.add_uint32(sizeof(uint8_t));
out.add_uint8(b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
protocol_send_message(msgs, b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested);
}
bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* setme)
std::optional<int> popNextMetadataRequest(tr_peerMsgsImpl* msgs)
{
if (std::empty(msgs->peerAskedForMetadata))
auto& reqs = msgs->peerAskedForMetadata;
if (std::empty(reqs))
{
return false;
return {};
}
auto& reqs = msgs->peerAskedForMetadata;
*setme = reqs.front();
auto next = reqs.front();
reqs.pop();
return true;
return next;
}
void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs)
@ -844,7 +898,6 @@ void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs)
void sendLtepHandshake(tr_peerMsgsImpl* msgs)
{
auto& out = msgs->outMessages;
static tr_quark version_quark = 0;
if (msgs->clientSentLtepHandshake)
@ -952,14 +1005,7 @@ void sendLtepHandshake(tr_peerMsgsImpl* msgs)
}
}
auto payload = tr_variantToStr(&val, TR_VARIANT_FMT_BENC);
out.add_uint32(2 * sizeof(uint8_t) + std::size(payload));
out.add_uint8(BtPeerMsgs::Ltep);
out.add_uint8(LtepMessages::Handshake);
out.add(payload);
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
msgs->dbgOutMessageLen();
protocol_send_message(msgs, BtPeerMsgs::Ltep, LtepMessages::Handshake, tr_variantToStr(&val, TR_VARIANT_FMT_BENC));
/* cleanup */
tr_variantClear(&val);
@ -1119,24 +1165,12 @@ void parseUtMetadata(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload_in)
}
else
{
auto& out = msgs->outMessages;
/* build the rejection message */
/* send a rejection message */
auto v = tr_variant{};
tr_variantInitDict(&v, 2);
tr_variantDictAddInt(&v, TR_KEY_msg_type, MetadataMsgType::Reject);
tr_variantDictAddInt(&v, TR_KEY_piece, piece);
auto const payload = tr_variantToStr(&v, TR_VARIANT_FMT_BENC);
/* write it out as a LTEP message to our outMessages buffer */
out.add_uint32(2 * sizeof(uint8_t) + std::size(payload));
out.add_uint8(BtPeerMsgs::Ltep);
out.add_uint8(msgs->ut_metadata_id);
out.add(payload);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
/* cleanup */
protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, tr_variantToStr(&v, TR_VARIANT_FMT_BENC));
tr_variantClear(&v);
}
}
@ -1288,57 +1322,6 @@ void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req)
}
}
bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len)
{
switch (id)
{
case BtPeerMsgs::Choke:
case BtPeerMsgs::Unchoke:
case BtPeerMsgs::Interested:
case BtPeerMsgs::NotInterested:
case BtPeerMsgs::FextHaveAll:
case BtPeerMsgs::FextHaveNone:
return len == 1;
case BtPeerMsgs::Have:
case BtPeerMsgs::FextSuggest:
case BtPeerMsgs::FextAllowedFast:
return len == 5;
case BtPeerMsgs::Bitfield:
if (msg->torrent->hasMetainfo())
{
return len == (msg->torrent->pieceCount() >> 3) + ((msg->torrent->pieceCount() & 7) != 0 ? 1 : 0) + 1U;
}
/* we don't know the piece count yet,
so we can only guess whether to send true or false */
if (msg->metadata_size_hint > 0)
{
return len <= msg->metadata_size_hint;
}
return true;
case BtPeerMsgs::Request:
case BtPeerMsgs::Cancel:
case BtPeerMsgs::FextReject:
return len == 13;
case BtPeerMsgs::Piece:
return len > 9 && len <= 16393;
case BtPeerMsgs::Port:
return len == 3;
case BtPeerMsgs::Ltep:
return len >= 2;
default:
return false;
}
}
int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>> block_data, tr_block_index_t block);
ReadResult read_piece_data(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload)
@ -1398,7 +1381,7 @@ ReadResult process_peer_message(tr_peerMsgsImpl* msgs, uint8_t id, libtransmissi
static_cast<int>(id),
std::size(payload)));
if (!messageLengthIsCorrect(msgs, id, sizeof(id) + std::size(payload)))
if (!messageLengthIsCorrect(msgs->torrent, id, sizeof(id) + std::size(payload)))
{
logdbg(
msgs,
@ -1748,15 +1731,15 @@ ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
}
// read <payload>
auto& current_payload = msgs->incoming.payload;
auto& payload = msgs->incoming.payload;
auto const full_payload_len = *current_message_len - sizeof(uint8_t /*message_type*/);
auto n_left = full_payload_len - std::size(current_payload);
auto n_left = full_payload_len - std::size(payload);
while (n_left > 0U && io->read_buffer_size() > 0U)
{
auto buf = std::array<char, tr_block_info::BlockSize>{};
auto const n_this_pass = std::min({ n_left, io->read_buffer_size(), std::size(buf) });
io->read_bytes(std::data(buf), n_this_pass);
current_payload.add(std::data(buf), n_this_pass);
payload.add(std::data(buf), n_this_pass);
n_left -= n_this_pass;
logtrace(msgs, fmt::format("read {:d} payload bytes; {:d} left to go", n_this_pass, n_left));
}
@ -1766,17 +1749,10 @@ ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
return READ_LATER;
}
// The incoming message is now complete. Reset the peerMsgs' incoming
// field so it's ready to receive the next message, then process the
// current one with `process_peer_message()`.
current_message_len.reset();
auto const message_type = *current_message_type;
auto const [read_state, n_piece_bytes_read] = process_peer_message(msgs, *current_message_type, payload);
current_message_type.reset();
auto payload = libtransmission::Buffer{};
std::swap(payload, current_payload);
auto const [read_state, n_piece_bytes_read] = process_peer_message(msgs, message_type, payload);
current_message_len.reset();
payload.clear();
*piece = n_piece_bytes_read;
return read_state;
}
@ -1797,26 +1773,11 @@ void updateMetadataRequests(tr_peerMsgsImpl* msgs, time_t now)
if (auto const piece = tr_torrentGetNextMetadataRequest(msgs->torrent, now); piece)
{
auto& out = msgs->outMessages;
/* build the data message */
auto tmp = tr_variant{};
tr_variantInitDict(&tmp, 3);
tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Request);
tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece);
auto const payload = tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC);
logtrace(msgs, fmt::format(FMT_STRING("requesting metadata piece #{:d}"), *piece));
/* write it out as a LTEP message to our outMessages buffer */
out.add_uint32(2 * sizeof(uint8_t) + std::size(payload));
out.add_uint8(BtPeerMsgs::Ltep);
out.add_uint8(msgs->ut_metadata_id);
out.add(payload);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
/* cleanup */
protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC));
tr_variantClear(&tmp);
}
}
@ -1851,176 +1812,136 @@ void updateBlockRequests(tr_peerMsgsImpl* msgs)
}
}
size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
namespace peer_pulse_helpers
{
size_t bytes_written = 0;
struct peer_request req;
bool const have_messages = !std::empty(msgs->outMessages);
bool const fext = msgs->io->supports_fext();
[[nodiscard]] size_t add_next_metadata_piece(tr_peerMsgsImpl* msgs)
{
auto const piece = popNextMetadataRequest(msgs);
// --- Protocol messages
if (have_messages && msgs->outMessagesBatchedAt == 0) /* fresh batch */
if (!piece.has_value()) // no pending requests
{
logtrace(msgs, fmt::format(FMT_STRING("started an outMessages batch (length is {:d})"), std::size(msgs->outMessages)));
msgs->outMessagesBatchedAt = now;
}
else if (have_messages && now - msgs->outMessagesBatchedAt >= msgs->outMessagesBatchPeriod)
{
auto const len = std::size(msgs->outMessages);
/* flush the protocol messages */
logtrace(msgs, fmt::format(FMT_STRING("flushing outMessages... to {:p} (length is {:d})"), fmt::ptr(msgs->io), len));
msgs->io->write(msgs->outMessages, false);
msgs->clientSentAnythingAt = now;
msgs->outMessagesBatchedAt = 0;
msgs->outMessagesBatchPeriod = LowPriorityIntervalSecs;
bytes_written += len;
return {};
}
// --- Metadata Pieces
if (auto piece = int{};
msgs->io->get_write_buffer_space(now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece))
auto const data = tr_torrentGetMetadataPiece(msgs->torrent, *piece);
if (!data.has_value())
{
auto ok = bool{ false };
// send a reject
auto tmp = tr_variant{};
tr_variantInitDict(&tmp, 2);
tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Reject);
tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece);
auto const n_bytes_written = protocol_send_message(
msgs,
BtPeerMsgs::Ltep,
msgs->ut_metadata_id,
tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC));
tr_variantClear(&tmp);
return n_bytes_written;
}
if (auto const piece_data = tr_torrentGetMetadataPiece(msgs->torrent, piece); piece_data)
// send the metadata
auto const data_sv = std::string_view{ reinterpret_cast<char const*>(std::data(*data)), std::size(*data) };
auto tmp = tr_variant{};
tr_variantInitDict(&tmp, 3);
tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Data);
tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece);
tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictSize());
auto const n_bytes_written = protocol_send_message(
msgs,
BtPeerMsgs::Ltep,
msgs->ut_metadata_id,
tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC),
data_sv);
tr_variantClear(&tmp);
return n_bytes_written;
}
[[nodiscard]] size_t add_next_piece(tr_peerMsgsImpl* msgs, uint64_t now)
{
if (msgs->io->get_write_buffer_space(now) == 0U || std::empty(msgs->peer_requested_))
{
return {};
}
auto const req = msgs->peer_requested_.front();
msgs->peer_requested_.erase(std::begin(msgs->peer_requested_));
auto buf = std::array<uint8_t, tr_block_info::BlockSize>{};
auto ok = msgs->isValidRequest(req) && msgs->torrent->hasPiece(req.index);
if (ok)
{
ok = msgs->torrent->ensurePieceIsChecked(req.index);
if (!ok)
{
auto& out = msgs->outMessages;
/* build the data message */
auto tmp = tr_variant{};
tr_variantInitDict(&tmp, 3);
tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Data);
tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictSize());
auto const payload = tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC);
/* write it out as a LTEP message to our outMessages buffer */
out.add_uint32(2 * sizeof(uint8_t) + std::size(payload) + std::size(*piece_data));
out.add_uint8(BtPeerMsgs::Ltep);
out.add_uint8(msgs->ut_metadata_id);
out.add(payload);
out.add(*piece_data);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
tr_variantClear(&tmp);
ok = true;
}
if (!ok) /* send a rejection message */
{
auto& out = msgs->outMessages;
/* build the rejection message */
auto tmp = tr_variant{};
tr_variantInitDict(&tmp, 2);
tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Reject);
tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
auto payload = tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC);
/* write it out as a LTEP message to our outMessages buffer */
out.add_uint32(2 * sizeof(uint8_t) + std::size(payload));
out.add_uint8(BtPeerMsgs::Ltep);
out.add_uint8(msgs->ut_metadata_id);
out.add(payload);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
tr_variantClear(&tmp);
msgs->torrent->setLocalError(
fmt::format(FMT_STRING("Please Verify Local Data! Piece #{:d} is corrupt."), req.index));
}
}
// --- Data Blocks
if (msgs->io->get_write_buffer_space(now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_))
if (ok)
{
req = msgs->peer_requested_.front();
msgs->peer_requested_.erase(std::begin(msgs->peer_requested_));
ok = msgs->session->cache
->readBlock(msgs->torrent, msgs->torrent->pieceLoc(req.index, req.offset), req.length, std::data(buf)) == 0;
}
if (msgs->isValidRequest(req) && msgs->torrent->hasPiece(req.index))
if (ok)
{
auto const piece_data = std::string_view{ reinterpret_cast<char const*>(std::data(buf)), req.length };
return protocol_send_message(msgs, BtPeerMsgs::Piece, req.index, req.offset, piece_data);
}
if (msgs->io->supports_fext())
{
return protocolSendReject(msgs, &req);
}
return {};
}
[[nodiscard]] size_t fill_output_buffer(tr_peerMsgsImpl* msgs, time_t now)
{
auto n_bytes_written = size_t{};
// fulfuill metadata requests
for (;;)
{
auto const old_len = n_bytes_written;
n_bytes_written += add_next_metadata_piece(msgs);
if (old_len == n_bytes_written)
{
uint32_t const msglen = 4 + 1 + 4 + 4 + req.length;
auto out = libtransmission::Buffer{};
out.reserve(msglen);
out.add_uint32(sizeof(uint8_t) + 2 * sizeof(uint32_t) + req.length);
out.add_uint8(BtPeerMsgs::Piece);
out.add_uint32(req.index);
out.add_uint32(req.offset);
auto buf = std::array<uint8_t, tr_block_info::BlockSize>{};
bool err = msgs->session->cache->readBlock(
msgs->torrent,
msgs->torrent->pieceLoc(req.index, req.offset),
req.length,
std::data(buf)) != 0;
out.add(std::data(buf), req.length);
/* check the piece if it needs checking... */
if (!err)
{
err = !msgs->torrent->ensurePieceIsChecked(req.index);
if (err)
{
msgs->torrent->setLocalError(
fmt::format(FMT_STRING("Please Verify Local Data! Piece #{:d} is corrupt."), req.index));
}
}
if (err)
{
if (fext)
{
protocolSendReject(msgs, &req);
}
}
else
{
logtrace(msgs, fmt::format(FMT_STRING("sending block {:d}:{:d}->{:d}"), req.index, req.offset, req.length));
auto const n = std::size(out);
TR_ASSERT(n == msglen);
msgs->io->write(out, true);
bytes_written += n;
msgs->clientSentAnythingAt = now;
msgs->blocks_sent_to_peer.add(tr_time(), 1);
}
if (err)
{
bytes_written = 0;
msgs = nullptr;
}
}
else if (fext) /* peer needs a reject message */
{
protocolSendReject(msgs, &req);
}
if (msgs != nullptr)
{
prefetchPieces(msgs);
break;
}
}
// --- Keepalive
// fulfuill piece requests
for (;;)
{
auto const old_len = n_bytes_written;
n_bytes_written += add_next_piece(msgs, now);
if (old_len == n_bytes_written)
{
break;
}
}
if (msgs != nullptr && msgs->clientSentAnythingAt != 0 && now - msgs->clientSentAnythingAt > KeepaliveIntervalSecs)
{
logtrace(msgs, "sending a keepalive message");
msgs->outMessages.add_uint32(0);
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
n_bytes_written += protocol_send_keepalive(msgs);
}
return bytes_written;
return n_bytes_written;
}
} // namespace peer_pulse_helpers
void peerPulse(void* vmsgs)
{
using namespace peer_pulse_helpers;
auto* msgs = static_cast<tr_peerMsgsImpl*>(vmsgs);
time_t const now = tr_time();
auto const now = tr_time();
updateDesiredRequestCount(msgs);
updateBlockRequests(msgs);
@ -2028,7 +1949,7 @@ void peerPulse(void* vmsgs)
for (;;)
{
if (fillOutputBuffer(msgs, now) < 1)
if (fill_output_buffer(msgs, now) == 0U)
{
break;
}
@ -2040,35 +1961,21 @@ void gotError(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs)
static_cast<tr_peerMsgsImpl*>(vmsgs)->publish(tr_peer_event::GotError(ENOTCONN));
}
void sendBitfield(tr_peerMsgsImpl* msgs)
{
TR_ASSERT(msgs->torrent->hasMetainfo());
auto& out = msgs->outMessages;
auto bytes = msgs->torrent->createPieceBitfield();
out.add_uint32(sizeof(uint8_t) + bytes.size());
out.add_uint8(BtPeerMsgs::Bitfield);
out.add(bytes);
logtrace(msgs, fmt::format(FMT_STRING("sending bitfield... outMessage size is now {:d}"), std::size(out)));
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs)
{
bool const fext = msgs->io->supports_fext();
if (fext && msgs->torrent->hasAll())
{
protocolSendHaveAll(msgs);
protocol_send_message(msgs, BtPeerMsgs::FextHaveAll);
}
else if (fext && msgs->torrent->hasNone())
{
protocolSendHaveNone(msgs);
protocol_send_message(msgs, BtPeerMsgs::FextHaveNone);
}
else if (!msgs->torrent->hasNone())
{
sendBitfield(msgs);
protocol_send_message(msgs, BtPeerMsgs::Bitfield, msgs->torrent->createPieceBitfield());
}
}
@ -2127,8 +2034,6 @@ void tr_peerMsgsImpl::sendPex()
return;
}
auto& out = this->outMessages;
// update msgs
std::swap(old4, new4);
std::swap(old6, new6);
@ -2201,15 +2106,7 @@ void tr_peerMsgsImpl::sendPex()
tr_variantDictAddRaw(&val, TR_KEY_dropped6, std::data(tmpbuf), std::size(tmpbuf));
}
/* write the pex message */
auto payload = tr_variantToStr(&val, TR_VARIANT_FMT_BENC);
out.add_uint32(2 * sizeof(uint8_t) + std::size(payload));
out.add_uint8(BtPeerMsgs::Ltep);
out.add_uint8(this->ut_pex_id);
out.add(payload);
this->pokeBatchPeriod(HighPriorityIntervalSecs);
logtrace(this, fmt::format(FMT_STRING("sending a pex message; outMessage size is now {:d}"), std::size(out)));
this->dbgOutMessageLen();
protocol_send_message(this, BtPeerMsgs::Ltep, this->ut_pex_id, tr_variantToStr(&val, TR_VARIANT_FMT_BENC));
tr_variantClear(&val);
}