diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index 4eed0b0b1..88c534ca9 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -46,6 +46,7 @@ #endif using namespace std::literals; +using Buffer = libtransmission::Buffer; namespace { @@ -166,9 +167,6 @@ auto constexpr MetadataReqQ = int{ 64 }; auto constexpr ReqQ = int{ 512 }; // used in lowering the outMessages queue period -auto constexpr ImmediatePriorityIntervalSecs = int{ 0 }; -auto constexpr HighPriorityIntervalSecs = int{ 2 }; -auto constexpr LowPriorityIntervalSecs = int{ 10 }; // how many blocks to keep prefetched per peer auto constexpr PrefetchMax = size_t{ 18 }; @@ -219,7 +217,7 @@ struct tr_incoming { std::optional length; // the full message payload length. Includes the +1 for id length std::optional id; // the protocol message, e.g. BtPeerMsgs::Piece - libtransmission::Buffer payload; + Buffer payload; struct incoming_piece_data { @@ -243,10 +241,11 @@ void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs); void didWrite(tr_peerIo* io, size_t bytes_written, bool was_piece_data, void* vmsgs); void gotError(tr_peerIo* io, tr_error const& err, void* vmsgs); void peerPulse(void* vmsgs); -void protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req); -void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke); -void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index); -void protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port); +size_t protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req); +size_t protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke); +size_t protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index); +size_t protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port); +size_t protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req); void sendInterest(tr_peerMsgsImpl* msgs, bool b); void sendLtepHandshake(tr_peerMsgsImpl* msgs); void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs); @@ -255,7 +254,7 @@ void updateDesiredRequestCount(tr_peerMsgsImpl* msgs); #define myLogMacro(msgs, level, text) \ do \ { \ - if (tr_logLevelIsActive(level)) \ + if (true || tr_logLevelIsActive(level)) \ { \ tr_logAddMessage( \ __FILE__, \ @@ -297,7 +296,6 @@ public: tr_peer_callback callback, void* callback_data) : tr_peerMsgs{ torrent_in, atom_in, client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() } - , outMessagesBatchPeriod{ LowPriorityIntervalSecs } , torrent{ torrent_in } , io{ std::move(io_in) } , have_{ torrent_in->pieceCount() } @@ -354,20 +352,6 @@ public: } } - void dbgOutMessageLen() const - { - logtrace(this, fmt::format(FMT_STRING("outMessage size is now {:d}"), std::size(outMessages))); - } - - void pokeBatchPeriod(int interval) - { - if (outMessagesBatchPeriod > interval) - { - outMessagesBatchPeriod = interval; - logtrace(this, fmt::format(FMT_STRING("lowering batch interval to {:d} seconds"), interval)); - } - } - bool isTransferringPieces(uint64_t now, tr_direction dir, tr_bytes_per_second_t* setme_bytes_per_second) const override { auto const bytes_per_second = io->get_piece_speed_bytes_per_second(now, dir); @@ -515,7 +499,7 @@ public: auto const left_in_block = block_size - loc.block_offset; auto const left_in_piece = torrent->pieceSize(loc.piece) - loc.piece_offset; auto const req_len = std::min(left_in_block, left_in_piece); - protocolSendRequest({ loc.piece, loc.piece_offset, req_len }); + protocolSendRequest(this, { loc.piece, loc.piece_offset, req_len }); offset += req_len; } } @@ -581,22 +565,6 @@ private: return max_reqs; } - void protocolSendRequest(struct peer_request const& req) - { - TR_ASSERT(isValidRequest(req)); - - auto& out = outMessages; - out.add_uint32(sizeof(uint8_t) + 3 * sizeof(uint32_t)); - out.add_uint8(BtPeerMsgs::Request); - out.add_uint32(req.index); - out.add_uint32(req.offset); - out.add_uint32(req.length); - - logtrace(this, fmt::format(FMT_STRING("requesting {:d}:{:d}->{:d}..."), req.index, req.offset, req.length)); - dbgOutMessageLen(); - pokeBatchPeriod(ImmediatePriorityIntervalSecs); - } - [[nodiscard]] bool calculate_active(tr_direction direction) const { if (direction == TR_CLIENT_TO_PEER) @@ -637,11 +605,6 @@ public: size_t desired_request_count = 0; - /* how long the outMessages batch should be allowed to grow before - * it's flushed -- some messages (like requests >:) should be sent - * very quickly; others aren't as urgent. */ - int8_t outMessagesBatchPeriod; - uint8_t ut_pex_id = 0; uint8_t ut_metadata_id = 0; @@ -653,8 +616,6 @@ public: tr_torrent* const torrent; - libtransmission::Buffer outMessages; /* all the non-piece messages */ - std::shared_ptr const io; struct QueuedPeerRequest : public peer_request @@ -679,7 +640,7 @@ public: time_t chokeChangedAt = 0; /* when we started batching the outMessages */ - time_t outMessagesBatchedAt = 0; + // time_t outMessagesBatchedAt = 0; struct tr_incoming incoming = {}; @@ -704,127 +665,220 @@ private: // --- -void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const* req) +[[nodiscard]] constexpr bool messageLengthIsCorrect(tr_torrent const* const tor, uint8_t id, uint32_t len) +{ + switch (id) + { + case BtPeerMsgs::Choke: + case BtPeerMsgs::Unchoke: + case BtPeerMsgs::Interested: + case BtPeerMsgs::NotInterested: + case BtPeerMsgs::FextHaveAll: + case BtPeerMsgs::FextHaveNone: + return len == 1U; + + case BtPeerMsgs::Have: + case BtPeerMsgs::FextSuggest: + case BtPeerMsgs::FextAllowedFast: + return len == 5U; + + case BtPeerMsgs::Bitfield: + return !tor->hasMetainfo() || len == 1 + ((tor->pieceCount() + 7U) / 8U); + + case BtPeerMsgs::Request: + case BtPeerMsgs::Cancel: + case BtPeerMsgs::FextReject: + return len == 13U; + + case BtPeerMsgs::Piece: + len -= sizeof(id) + sizeof(uint32_t /*piece*/) + sizeof(uint32_t /*offset*/); + return len <= tr_block_info::BlockSize; + + case BtPeerMsgs::Port: + return len == 3U; + + case BtPeerMsgs::Ltep: + return len >= 2U; + + default: // unrecognized message + return false; + } +} + +namespace protocol_send_message_helpers +{ +namespace +{ + +[[nodiscard]] constexpr auto get_param_length(uint8_t param) noexcept +{ + return sizeof(param); +} + +[[nodiscard]] constexpr auto get_param_length(uint16_t param) noexcept +{ + return sizeof(param); +} + +[[nodiscard]] constexpr auto get_param_length(uint32_t param) noexcept +{ + return sizeof(param); +} + +template +[[nodiscard]] TR_CONSTEXPR20 auto get_param_length(T const& param) noexcept +{ + return std::size(param); +} + +// --- + +void add_param(Buffer& buffer, uint8_t param) noexcept +{ + buffer.add_uint8(param); +} + +void add_param(Buffer& buffer, uint16_t param) noexcept +{ + buffer.add_uint16(param); +} + +void add_param(Buffer& buffer, uint32_t param) noexcept +{ + buffer.add_uint32(param); +} + +template +void add_param(Buffer& buffer, T const& param) noexcept +{ + buffer.add(param); +} + +// --- + +[[nodiscard]] std::string log_param(uint8_t param) +{ + return fmt::format(" {:d}", static_cast(param)); +} + +[[nodiscard]] std::string log_param(uint16_t param) +{ + return fmt::format(" {:d}", static_cast(param)); +} + +[[nodiscard]] std::string log_param(uint32_t param) +{ + return fmt::format(" {:d}", static_cast(param)); +} + +template +[[nodiscard]] std::string log_param(T const& /*unused*/) +{ + return " []"; +} + +template +[[nodiscard]] std::string build_log_message(uint8_t type, Args const&... args) +{ + auto text = fmt::format("sending '{:s}'", BtPeerMsgs::debug_name(type)); + (text.append(log_param(args)), ...); + return text; +} +} // namespace + +template +void build_peer_message(tr_peerMsgsImpl const* const msgs, Buffer& out, uint8_t type, Args const&... args) +{ + logtrace(msgs, build_log_message(type, args...)); + + auto const old_len = std::size(out); + auto msg_len = sizeof(type); + ((msg_len += get_param_length(args)), ...); + out.reserve(old_len + msg_len); + out.add_uint32(msg_len); + out.add_uint8(type); + (add_param(out, args), ...); + + TR_ASSERT(old_len + sizeof(uint32_t) + msg_len); + TR_ASSERT(messageLengthIsCorrect(msgs->torrent, type, msg_len)); +} +} // namespace protocol_send_message_helpers + +template +size_t protocol_send_message(tr_peerMsgsImpl const* const msgs, uint8_t type, Args const&... args) +{ + using namespace protocol_send_message_helpers; + + auto out = Buffer{}; + build_peer_message(msgs, out, type, args...); + auto const n_bytes_added = std::size(out); + msgs->io->write(out, type == BtPeerMsgs::Piece); + return n_bytes_added; +} + +size_t protocol_send_keepalive(tr_peerMsgsImpl* msgs) +{ + logtrace(msgs, "sending 'keepalive'"); + + auto out = Buffer{}; + out.add_uint32(0); + + auto const n_bytes_added = std::size(out); + msgs->io->write(out, false); + return n_bytes_added; +} + +auto protocolSendReject(tr_peerMsgsImpl* const msgs, struct peer_request const* req) { TR_ASSERT(msgs->io->supports_fext()); - - auto& out = msgs->outMessages; - - out.add_uint32(sizeof(uint8_t) + 3 * sizeof(uint32_t)); - out.add_uint8(BtPeerMsgs::FextReject); - out.add_uint32(req->index); - out.add_uint32(req->offset); - out.add_uint32(req->length); - - logtrace(msgs, fmt::format(FMT_STRING("rejecting {:d}:{:d}->{:d}..."), req->index, req->offset, req->length)); - msgs->dbgOutMessageLen(); + return protocol_send_message(msgs, BtPeerMsgs::FextReject, req->index, req->offset, req->length); } -void protocolSendCancel(tr_peerMsgsImpl* msgs, peer_request const& req) +size_t protocolSendCancel(tr_peerMsgsImpl* const msgs, peer_request const& req) { - auto& out = msgs->outMessages; - - out.add_uint32(sizeof(uint8_t) + 3 * sizeof(uint32_t)); - out.add_uint8(BtPeerMsgs::Cancel); - out.add_uint32(req.index); - out.add_uint32(req.offset); - out.add_uint32(req.length); - - logtrace(msgs, fmt::format(FMT_STRING("cancelling {:d}:{:d}->{:d}..."), req.index, req.offset, req.length)); - msgs->dbgOutMessageLen(); - msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs); + return protocol_send_message(msgs, BtPeerMsgs::Cancel, req.index, req.offset, req.length); } -void protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port) +size_t protocolSendRequest(tr_peerMsgsImpl* const msgs, struct peer_request const& req) { - auto& out = msgs->outMessages; - - logtrace(msgs, fmt::format(FMT_STRING("sending Port {:d}"), port.host())); - out.add_uint32(3); - out.add_uint8(BtPeerMsgs::Port); - out.add_port(port); + TR_ASSERT(msgs->isValidRequest(req)); + return protocol_send_message(msgs, BtPeerMsgs::Request, req.index, req.offset, req.length); } -void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index) +size_t protocolSendPort(tr_peerMsgsImpl* const msgs, tr_port port) { - auto& out = msgs->outMessages; - - out.add_uint32(sizeof(uint8_t) + sizeof(uint32_t)); - out.add_uint8(BtPeerMsgs::Have); - out.add_uint32(index); - - logtrace(msgs, fmt::format(FMT_STRING("sending Have {:d}"), index)); - msgs->dbgOutMessageLen(); - msgs->pokeBatchPeriod(LowPriorityIntervalSecs); + return protocol_send_message(msgs, BtPeerMsgs::Port, port.host()); } -void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke) +size_t protocolSendHave(tr_peerMsgsImpl* const msgs, tr_piece_index_t index) { - auto& out = msgs->outMessages; - - out.add_uint32(sizeof(uint8_t)); - out.add_uint8(choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke); - - logtrace(msgs, choke ? "sending choke" : "sending unchoked"); - msgs->dbgOutMessageLen(); - msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs); + return protocol_send_message(msgs, BtPeerMsgs::Have, index); } -void protocolSendHaveAll(tr_peerMsgsImpl* msgs) +size_t protocolSendChoke(tr_peerMsgsImpl* const msgs, bool choke) { - TR_ASSERT(msgs->io->supports_fext()); - - auto& out = msgs->outMessages; - - out.add_uint32(sizeof(uint8_t)); - out.add_uint8(BtPeerMsgs::FextHaveAll); - - logtrace(msgs, "sending HAVE_ALL..."); - msgs->dbgOutMessageLen(); - msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs); -} - -void protocolSendHaveNone(tr_peerMsgsImpl* msgs) -{ - TR_ASSERT(msgs->io->supports_fext()); - - auto& out = msgs->outMessages; - - out.add_uint32(sizeof(uint8_t)); - out.add_uint8(BtPeerMsgs::FextHaveNone); - - logtrace(msgs, "sending HAVE_NONE..."); - msgs->dbgOutMessageLen(); - msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs); + return protocol_send_message(msgs, choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke); } // --- INTEREST void sendInterest(tr_peerMsgsImpl* msgs, bool b) { - TR_ASSERT(msgs != nullptr); - - auto& out = msgs->outMessages; - - logtrace(msgs, b ? "Sending Interested" : "Sending Not Interested"); - out.add_uint32(sizeof(uint8_t)); - out.add_uint8(b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested); - - msgs->pokeBatchPeriod(HighPriorityIntervalSecs); - msgs->dbgOutMessageLen(); + protocol_send_message(msgs, b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested); } -bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* setme) +std::optional popNextMetadataRequest(tr_peerMsgsImpl* msgs) { - if (std::empty(msgs->peerAskedForMetadata)) + auto& reqs = msgs->peerAskedForMetadata; + + if (std::empty(reqs)) { - return false; + return {}; } - auto& reqs = msgs->peerAskedForMetadata; - *setme = reqs.front(); + auto next = reqs.front(); reqs.pop(); - return true; + return next; } void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs) @@ -844,7 +898,6 @@ void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs) void sendLtepHandshake(tr_peerMsgsImpl* msgs) { - auto& out = msgs->outMessages; static tr_quark version_quark = 0; if (msgs->clientSentLtepHandshake) @@ -952,14 +1005,7 @@ void sendLtepHandshake(tr_peerMsgsImpl* msgs) } } - auto payload = tr_variantToStr(&val, TR_VARIANT_FMT_BENC); - - out.add_uint32(2 * sizeof(uint8_t) + std::size(payload)); - out.add_uint8(BtPeerMsgs::Ltep); - out.add_uint8(LtepMessages::Handshake); - out.add(payload); - msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs); - msgs->dbgOutMessageLen(); + protocol_send_message(msgs, BtPeerMsgs::Ltep, LtepMessages::Handshake, tr_variantToStr(&val, TR_VARIANT_FMT_BENC)); /* cleanup */ tr_variantClear(&val); @@ -1119,24 +1165,12 @@ void parseUtMetadata(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload_in) } else { - auto& out = msgs->outMessages; - - /* build the rejection message */ + /* send a rejection message */ auto v = tr_variant{}; tr_variantInitDict(&v, 2); tr_variantDictAddInt(&v, TR_KEY_msg_type, MetadataMsgType::Reject); tr_variantDictAddInt(&v, TR_KEY_piece, piece); - auto const payload = tr_variantToStr(&v, TR_VARIANT_FMT_BENC); - - /* write it out as a LTEP message to our outMessages buffer */ - out.add_uint32(2 * sizeof(uint8_t) + std::size(payload)); - out.add_uint8(BtPeerMsgs::Ltep); - out.add_uint8(msgs->ut_metadata_id); - out.add(payload); - msgs->pokeBatchPeriod(HighPriorityIntervalSecs); - msgs->dbgOutMessageLen(); - - /* cleanup */ + protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, tr_variantToStr(&v, TR_VARIANT_FMT_BENC)); tr_variantClear(&v); } } @@ -1288,57 +1322,6 @@ void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req) } } -bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len) -{ - switch (id) - { - case BtPeerMsgs::Choke: - case BtPeerMsgs::Unchoke: - case BtPeerMsgs::Interested: - case BtPeerMsgs::NotInterested: - case BtPeerMsgs::FextHaveAll: - case BtPeerMsgs::FextHaveNone: - return len == 1; - - case BtPeerMsgs::Have: - case BtPeerMsgs::FextSuggest: - case BtPeerMsgs::FextAllowedFast: - return len == 5; - - case BtPeerMsgs::Bitfield: - if (msg->torrent->hasMetainfo()) - { - return len == (msg->torrent->pieceCount() >> 3) + ((msg->torrent->pieceCount() & 7) != 0 ? 1 : 0) + 1U; - } - - /* we don't know the piece count yet, - so we can only guess whether to send true or false */ - if (msg->metadata_size_hint > 0) - { - return len <= msg->metadata_size_hint; - } - - return true; - - case BtPeerMsgs::Request: - case BtPeerMsgs::Cancel: - case BtPeerMsgs::FextReject: - return len == 13; - - case BtPeerMsgs::Piece: - return len > 9 && len <= 16393; - - case BtPeerMsgs::Port: - return len == 3; - - case BtPeerMsgs::Ltep: - return len >= 2; - - default: - return false; - } -} - int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr> block_data, tr_block_index_t block); ReadResult read_piece_data(tr_peerMsgsImpl* msgs, libtransmission::Buffer& payload) @@ -1398,7 +1381,7 @@ ReadResult process_peer_message(tr_peerMsgsImpl* msgs, uint8_t id, libtransmissi static_cast(id), std::size(payload))); - if (!messageLengthIsCorrect(msgs, id, sizeof(id) + std::size(payload))) + if (!messageLengthIsCorrect(msgs->torrent, id, sizeof(id) + std::size(payload))) { logdbg( msgs, @@ -1748,15 +1731,15 @@ ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) } // read - auto& current_payload = msgs->incoming.payload; + auto& payload = msgs->incoming.payload; auto const full_payload_len = *current_message_len - sizeof(uint8_t /*message_type*/); - auto n_left = full_payload_len - std::size(current_payload); + auto n_left = full_payload_len - std::size(payload); while (n_left > 0U && io->read_buffer_size() > 0U) { auto buf = std::array{}; auto const n_this_pass = std::min({ n_left, io->read_buffer_size(), std::size(buf) }); io->read_bytes(std::data(buf), n_this_pass); - current_payload.add(std::data(buf), n_this_pass); + payload.add(std::data(buf), n_this_pass); n_left -= n_this_pass; logtrace(msgs, fmt::format("read {:d} payload bytes; {:d} left to go", n_this_pass, n_left)); } @@ -1766,17 +1749,10 @@ ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) return READ_LATER; } - // The incoming message is now complete. Reset the peerMsgs' incoming - // field so it's ready to receive the next message, then process the - // current one with `process_peer_message()`. - - current_message_len.reset(); - auto const message_type = *current_message_type; + auto const [read_state, n_piece_bytes_read] = process_peer_message(msgs, *current_message_type, payload); current_message_type.reset(); - auto payload = libtransmission::Buffer{}; - std::swap(payload, current_payload); - - auto const [read_state, n_piece_bytes_read] = process_peer_message(msgs, message_type, payload); + current_message_len.reset(); + payload.clear(); *piece = n_piece_bytes_read; return read_state; } @@ -1797,26 +1773,11 @@ void updateMetadataRequests(tr_peerMsgsImpl* msgs, time_t now) if (auto const piece = tr_torrentGetNextMetadataRequest(msgs->torrent, now); piece) { - auto& out = msgs->outMessages; - - /* build the data message */ auto tmp = tr_variant{}; tr_variantInitDict(&tmp, 3); tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Request); tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); - auto const payload = tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC); - - logtrace(msgs, fmt::format(FMT_STRING("requesting metadata piece #{:d}"), *piece)); - - /* write it out as a LTEP message to our outMessages buffer */ - out.add_uint32(2 * sizeof(uint8_t) + std::size(payload)); - out.add_uint8(BtPeerMsgs::Ltep); - out.add_uint8(msgs->ut_metadata_id); - out.add(payload); - msgs->pokeBatchPeriod(HighPriorityIntervalSecs); - msgs->dbgOutMessageLen(); - - /* cleanup */ + protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC)); tr_variantClear(&tmp); } } @@ -1851,176 +1812,136 @@ void updateBlockRequests(tr_peerMsgsImpl* msgs) } } -size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now) +namespace peer_pulse_helpers { - size_t bytes_written = 0; - struct peer_request req; - bool const have_messages = !std::empty(msgs->outMessages); - bool const fext = msgs->io->supports_fext(); +[[nodiscard]] size_t add_next_metadata_piece(tr_peerMsgsImpl* msgs) +{ + auto const piece = popNextMetadataRequest(msgs); - // --- Protocol messages - - if (have_messages && msgs->outMessagesBatchedAt == 0) /* fresh batch */ + if (!piece.has_value()) // no pending requests { - logtrace(msgs, fmt::format(FMT_STRING("started an outMessages batch (length is {:d})"), std::size(msgs->outMessages))); - msgs->outMessagesBatchedAt = now; - } - else if (have_messages && now - msgs->outMessagesBatchedAt >= msgs->outMessagesBatchPeriod) - { - auto const len = std::size(msgs->outMessages); - /* flush the protocol messages */ - logtrace(msgs, fmt::format(FMT_STRING("flushing outMessages... to {:p} (length is {:d})"), fmt::ptr(msgs->io), len)); - msgs->io->write(msgs->outMessages, false); - msgs->clientSentAnythingAt = now; - msgs->outMessagesBatchedAt = 0; - msgs->outMessagesBatchPeriod = LowPriorityIntervalSecs; - bytes_written += len; + return {}; } - // --- Metadata Pieces - - if (auto piece = int{}; - msgs->io->get_write_buffer_space(now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece)) + auto const data = tr_torrentGetMetadataPiece(msgs->torrent, *piece); + if (!data.has_value()) { - auto ok = bool{ false }; + // send a reject + auto tmp = tr_variant{}; + tr_variantInitDict(&tmp, 2); + tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Reject); + tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); + auto const n_bytes_written = protocol_send_message( + msgs, + BtPeerMsgs::Ltep, + msgs->ut_metadata_id, + tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC)); + tr_variantClear(&tmp); + return n_bytes_written; + } - if (auto const piece_data = tr_torrentGetMetadataPiece(msgs->torrent, piece); piece_data) + // send the metadata + auto const data_sv = std::string_view{ reinterpret_cast(std::data(*data)), std::size(*data) }; + auto tmp = tr_variant{}; + tr_variantInitDict(&tmp, 3); + tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Data); + tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); + tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictSize()); + auto const n_bytes_written = protocol_send_message( + msgs, + BtPeerMsgs::Ltep, + msgs->ut_metadata_id, + tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC), + data_sv); + tr_variantClear(&tmp); + return n_bytes_written; +} + +[[nodiscard]] size_t add_next_piece(tr_peerMsgsImpl* msgs, uint64_t now) +{ + if (msgs->io->get_write_buffer_space(now) == 0U || std::empty(msgs->peer_requested_)) + { + return {}; + } + + auto const req = msgs->peer_requested_.front(); + msgs->peer_requested_.erase(std::begin(msgs->peer_requested_)); + + auto buf = std::array{}; + auto ok = msgs->isValidRequest(req) && msgs->torrent->hasPiece(req.index); + + if (ok) + { + ok = msgs->torrent->ensurePieceIsChecked(req.index); + + if (!ok) { - auto& out = msgs->outMessages; - - /* build the data message */ - auto tmp = tr_variant{}; - tr_variantInitDict(&tmp, 3); - tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Data); - tr_variantDictAddInt(&tmp, TR_KEY_piece, piece); - tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictSize()); - auto const payload = tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC); - - /* write it out as a LTEP message to our outMessages buffer */ - out.add_uint32(2 * sizeof(uint8_t) + std::size(payload) + std::size(*piece_data)); - out.add_uint8(BtPeerMsgs::Ltep); - out.add_uint8(msgs->ut_metadata_id); - out.add(payload); - out.add(*piece_data); - msgs->pokeBatchPeriod(HighPriorityIntervalSecs); - msgs->dbgOutMessageLen(); - - tr_variantClear(&tmp); - - ok = true; - } - - if (!ok) /* send a rejection message */ - { - auto& out = msgs->outMessages; - - /* build the rejection message */ - auto tmp = tr_variant{}; - tr_variantInitDict(&tmp, 2); - tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Reject); - tr_variantDictAddInt(&tmp, TR_KEY_piece, piece); - auto payload = tr_variantToStr(&tmp, TR_VARIANT_FMT_BENC); - - /* write it out as a LTEP message to our outMessages buffer */ - out.add_uint32(2 * sizeof(uint8_t) + std::size(payload)); - out.add_uint8(BtPeerMsgs::Ltep); - out.add_uint8(msgs->ut_metadata_id); - out.add(payload); - msgs->pokeBatchPeriod(HighPriorityIntervalSecs); - msgs->dbgOutMessageLen(); - - tr_variantClear(&tmp); + msgs->torrent->setLocalError( + fmt::format(FMT_STRING("Please Verify Local Data! Piece #{:d} is corrupt."), req.index)); } } - // --- Data Blocks - - if (msgs->io->get_write_buffer_space(now) >= tr_block_info::BlockSize && !std::empty(msgs->peer_requested_)) + if (ok) { - req = msgs->peer_requested_.front(); - msgs->peer_requested_.erase(std::begin(msgs->peer_requested_)); + ok = msgs->session->cache + ->readBlock(msgs->torrent, msgs->torrent->pieceLoc(req.index, req.offset), req.length, std::data(buf)) == 0; + } - if (msgs->isValidRequest(req) && msgs->torrent->hasPiece(req.index)) + if (ok) + { + auto const piece_data = std::string_view{ reinterpret_cast(std::data(buf)), req.length }; + return protocol_send_message(msgs, BtPeerMsgs::Piece, req.index, req.offset, piece_data); + } + + if (msgs->io->supports_fext()) + { + return protocolSendReject(msgs, &req); + } + + return {}; +} + +[[nodiscard]] size_t fill_output_buffer(tr_peerMsgsImpl* msgs, time_t now) +{ + auto n_bytes_written = size_t{}; + + // fulfuill metadata requests + for (;;) + { + auto const old_len = n_bytes_written; + n_bytes_written += add_next_metadata_piece(msgs); + if (old_len == n_bytes_written) { - uint32_t const msglen = 4 + 1 + 4 + 4 + req.length; - - auto out = libtransmission::Buffer{}; - out.reserve(msglen); - - out.add_uint32(sizeof(uint8_t) + 2 * sizeof(uint32_t) + req.length); - out.add_uint8(BtPeerMsgs::Piece); - out.add_uint32(req.index); - out.add_uint32(req.offset); - auto buf = std::array{}; - bool err = msgs->session->cache->readBlock( - msgs->torrent, - msgs->torrent->pieceLoc(req.index, req.offset), - req.length, - std::data(buf)) != 0; - out.add(std::data(buf), req.length); - - /* check the piece if it needs checking... */ - if (!err) - { - err = !msgs->torrent->ensurePieceIsChecked(req.index); - if (err) - { - msgs->torrent->setLocalError( - fmt::format(FMT_STRING("Please Verify Local Data! Piece #{:d} is corrupt."), req.index)); - } - } - - if (err) - { - if (fext) - { - protocolSendReject(msgs, &req); - } - } - else - { - logtrace(msgs, fmt::format(FMT_STRING("sending block {:d}:{:d}->{:d}"), req.index, req.offset, req.length)); - auto const n = std::size(out); - TR_ASSERT(n == msglen); - msgs->io->write(out, true); - bytes_written += n; - msgs->clientSentAnythingAt = now; - msgs->blocks_sent_to_peer.add(tr_time(), 1); - } - - if (err) - { - bytes_written = 0; - msgs = nullptr; - } - } - else if (fext) /* peer needs a reject message */ - { - protocolSendReject(msgs, &req); - } - - if (msgs != nullptr) - { - prefetchPieces(msgs); + break; } } - // --- Keepalive + // fulfuill piece requests + for (;;) + { + auto const old_len = n_bytes_written; + n_bytes_written += add_next_piece(msgs, now); + if (old_len == n_bytes_written) + { + break; + } + } if (msgs != nullptr && msgs->clientSentAnythingAt != 0 && now - msgs->clientSentAnythingAt > KeepaliveIntervalSecs) { - logtrace(msgs, "sending a keepalive message"); - msgs->outMessages.add_uint32(0); - msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs); + n_bytes_written += protocol_send_keepalive(msgs); } - return bytes_written; + return n_bytes_written; } +} // namespace peer_pulse_helpers void peerPulse(void* vmsgs) { + using namespace peer_pulse_helpers; + auto* msgs = static_cast(vmsgs); - time_t const now = tr_time(); + auto const now = tr_time(); updateDesiredRequestCount(msgs); updateBlockRequests(msgs); @@ -2028,7 +1949,7 @@ void peerPulse(void* vmsgs) for (;;) { - if (fillOutputBuffer(msgs, now) < 1) + if (fill_output_buffer(msgs, now) == 0U) { break; } @@ -2040,35 +1961,21 @@ void gotError(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs) static_cast(vmsgs)->publish(tr_peer_event::GotError(ENOTCONN)); } -void sendBitfield(tr_peerMsgsImpl* msgs) -{ - TR_ASSERT(msgs->torrent->hasMetainfo()); - - auto& out = msgs->outMessages; - - auto bytes = msgs->torrent->createPieceBitfield(); - out.add_uint32(sizeof(uint8_t) + bytes.size()); - out.add_uint8(BtPeerMsgs::Bitfield); - out.add(bytes); - logtrace(msgs, fmt::format(FMT_STRING("sending bitfield... outMessage size is now {:d}"), std::size(out))); - msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs); -} - void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs) { bool const fext = msgs->io->supports_fext(); if (fext && msgs->torrent->hasAll()) { - protocolSendHaveAll(msgs); + protocol_send_message(msgs, BtPeerMsgs::FextHaveAll); } else if (fext && msgs->torrent->hasNone()) { - protocolSendHaveNone(msgs); + protocol_send_message(msgs, BtPeerMsgs::FextHaveNone); } else if (!msgs->torrent->hasNone()) { - sendBitfield(msgs); + protocol_send_message(msgs, BtPeerMsgs::Bitfield, msgs->torrent->createPieceBitfield()); } } @@ -2127,8 +2034,6 @@ void tr_peerMsgsImpl::sendPex() return; } - auto& out = this->outMessages; - // update msgs std::swap(old4, new4); std::swap(old6, new6); @@ -2201,15 +2106,7 @@ void tr_peerMsgsImpl::sendPex() tr_variantDictAddRaw(&val, TR_KEY_dropped6, std::data(tmpbuf), std::size(tmpbuf)); } - /* write the pex message */ - auto payload = tr_variantToStr(&val, TR_VARIANT_FMT_BENC); - out.add_uint32(2 * sizeof(uint8_t) + std::size(payload)); - out.add_uint8(BtPeerMsgs::Ltep); - out.add_uint8(this->ut_pex_id); - out.add(payload); - this->pokeBatchPeriod(HighPriorityIntervalSecs); - logtrace(this, fmt::format(FMT_STRING("sending a pex message; outMessage size is now {:d}"), std::size(out))); - this->dbgOutMessageLen(); + protocol_send_message(this, BtPeerMsgs::Ltep, this->ut_pex_id, tr_variantToStr(&val, TR_VARIANT_FMT_BENC)); tr_variantClear(&val); }