// This file Copyright © Mnemosyne LLC. // It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), // or any future license endorsed by Mnemosyne LLC. // License text can be found in the licenses/ folder. #include #include #include #include #include #include // uint8_t, uint32_t, int64_t #include #include #include #include // std::unique_ptr #include #include #include #include #include #include #include #include #include #include #include #include "libtransmission/transmission.h" #include "libtransmission/bitfield.h" #include "libtransmission/block-info.h" #include "libtransmission/cache.h" #include "libtransmission/crypto-utils.h" #include "libtransmission/interned-string.h" #include "libtransmission/log.h" #include "libtransmission/peer-common.h" #include "libtransmission/peer-io.h" #include "libtransmission/peer-mgr.h" #include "libtransmission/peer-msgs.h" #include "libtransmission/quark.h" #include "libtransmission/session.h" #include "libtransmission/timer.h" #include "libtransmission/torrent-magnet.h" #include "libtransmission/torrent.h" #include "libtransmission/tr-assert.h" #include "libtransmission/tr-buffer.h" #include "libtransmission/tr-macros.h" #include "libtransmission/utils.h" #include "libtransmission/variant.h" #include "libtransmission/version.h" struct tr_error; #ifndef EBADMSG #define EBADMSG EINVAL #endif using namespace std::literals; namespace { // initial capacity is big enough to hold a BtPeerMsgs::Piece message using MessageBuffer = libtransmission::StackBuffer>; using MessageReader = libtransmission::BufferReader; using MessageWriter = libtransmission::BufferWriter; // these values are hardcoded by various BEPs as noted namespace BtPeerMsgs { // https://www.bittorrent.org/beps/bep_0003.html#peer-messages auto constexpr Choke = uint8_t{ 0 }; auto constexpr Unchoke = uint8_t{ 1 }; auto constexpr Interested = uint8_t{ 2 }; auto constexpr NotInterested = uint8_t{ 3 }; auto constexpr Have = uint8_t{ 4 }; auto constexpr Bitfield = uint8_t{ 5 }; auto constexpr Request = uint8_t{ 6 }; auto constexpr Piece = uint8_t{ 7 }; auto constexpr Cancel = uint8_t{ 8 }; // https://www.bittorrent.org/beps/bep_0005.html#bittorrent-protocol-extension auto constexpr DhtPort = uint8_t{ 9 }; // https://www.bittorrent.org/beps/bep_0006.html auto constexpr FextSuggest = uint8_t{ 13 }; auto constexpr FextHaveAll = uint8_t{ 14 }; auto constexpr FextHaveNone = uint8_t{ 15 }; auto constexpr FextReject = uint8_t{ 16 }; auto constexpr FextAllowedFast = uint8_t{ 17 }; // https://www.bittorrent.org/beps/bep_0010.html // see also LtepMessageIds below auto constexpr Ltep = uint8_t{ 20 }; [[nodiscard]] constexpr std::string_view debug_name(uint8_t type) noexcept { switch (type) { case Bitfield: return "bitfield"sv; case Cancel: return "cancel"sv; case Choke: return "choke"sv; case FextAllowedFast: return "fext-allow-fast"sv; case FextHaveAll: return "fext-have-all"sv; case FextHaveNone: return "fext-have-none"sv; case FextReject: return "fext-reject"sv; case FextSuggest: return "fext-suggest"sv; case Have: return "have"sv; case Interested: return "interested"sv; case Ltep: return "ltep"sv; case NotInterested: return "not-interested"sv; case Piece: return "piece"sv; case DhtPort: return "dht-port"sv; case Request: return "request"sv; case Unchoke: return "unchoke"sv; default: return "unknown"sv; } } } // namespace BtPeerMsgs namespace LtepMessages { // https://www.bittorrent.org/beps/bep_0010.html auto constexpr Handshake = uint8_t{ 0 }; } // namespace LtepMessages // https://www.bittorrent.org/beps/bep_0010.html // Client-defined extension message IDs that we tell peers about // in the LTEP handshake and will respond to when sent in an LTEP // message. enum LtepMessageIds : uint8_t { // we support peer exchange (bep 11) // https://www.bittorrent.org/beps/bep_0011.html UT_PEX_ID = 1, // we support sending metadata files (bep 9) // https://www.bittorrent.org/beps/bep_0009.html // see also MetadataMsgType below UT_METADATA_ID = 3, }; // https://www.bittorrent.org/beps/bep_0009.html namespace MetadataMsgType { auto constexpr Request = 0; auto constexpr Data = 1; auto constexpr Reject = 2; } // namespace MetadataMsgType auto constexpr MinChokePeriodSec = time_t{ 10 }; // idle seconds before we send a keepalive auto constexpr KeepaliveIntervalSecs = time_t{ 100 }; auto constexpr MetadataReqQ = size_t{ 64U }; auto constexpr ReqQ = 512; // when we're making requests from another peer, // batch them together to send enough requests to // meet our bandwidth goals for the next N seconds auto constexpr RequestBufSecs = time_t{ 10 }; // --- auto constexpr MaxPexPeerCount = size_t{ 50U }; // --- enum class EncryptionPreference : uint8_t { Unknown, Yes, No }; // --- struct peer_request { uint32_t index = 0; uint32_t offset = 0; uint32_t length = 0; [[nodiscard]] auto constexpr operator==(peer_request const& that) const noexcept { return this->index == that.index && this->offset == that.offset && this->length == that.length; } [[nodiscard]] static auto from_block(tr_torrent const& tor, tr_block_index_t block) noexcept { auto const loc = tor.block_loc(block); return peer_request{ loc.piece, loc.piece_offset, tor.block_size(block) }; } }; // --- /* this is raw, unchanged data from the peer regarding * the current message that it's sending us. */ struct tr_incoming { std::optional length; // the full message payload length. Includes the +1 for id length std::optional id; // the protocol message, e.g. BtPeerMsgs::Piece MessageBuffer payload; struct incoming_piece_data { explicit incoming_piece_data(uint32_t block_size) : buf{ std::make_unique(block_size) } , block_size_{ block_size } { } [[nodiscard]] bool add_span(size_t begin, size_t end) { if (begin > end || end > block_size_) { return false; } for (; begin < end; ++begin) { have_.set(begin); } return true; } [[nodiscard]] auto has_all() const noexcept { return have_.count() >= block_size_; } std::unique_ptr buf; private: std::bitset have_; uint32_t const block_size_; }; std::unordered_map blocks; }; #define myLogMacro(msgs, level, text) \ do \ { \ if (tr_logLevelIsActive(level)) \ { \ tr_logAddMessage( \ __FILE__, \ __LINE__, \ (level), \ fmt::format("{:s} [{:s}]: {:s}", (msgs)->display_name(), (msgs)->user_agent().sv(), text), \ (msgs)->tor_.name()); \ } \ } while (0) #define logdbg(msgs, text) myLogMacro(msgs, TR_LOG_DEBUG, text) #define logtrace(msgs, text) myLogMacro(msgs, TR_LOG_TRACE, text) #define logwarn(msgs, text) myLogMacro(msgs, TR_LOG_WARN, text) using ReadResult = std::pair; /** * Low-level communication state information about a connected peer. * * This structure remembers the low-level protocol states that we're * in with this peer, such as active requests, pex messages, and so on. * Its fields are all private to peer-msgs.c. * * Data not directly involved with sending & receiving messages is * stored in tr_peer, where it can be accessed by both peermsgs and * the peer manager. * * @see tr_peer * @see tr_peer_info */ class tr_peerMsgsImpl final : public tr_peerMsgs { public: tr_peerMsgsImpl( tr_torrent& torrent_in, std::shared_ptr peer_info_in, std::shared_ptr io_in, tr_interned_string client, tr_peer_callback_bt callback, void* callback_data) : tr_peerMsgs{ torrent_in, std::move(peer_info_in), client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() } , tor_{ torrent_in } , io_{ std::move(io_in) } , have_{ torrent_in.piece_count() } , callback_{ callback } , callback_data_{ callback_data } { if (tor_.allows_pex()) { pex_timer_ = session->timerMaker().create([this]() { send_ut_pex(); }); pex_timer_->start_repeating(SendPexInterval); } if (io_->supports_ltep()) { send_ltep_handshake(); } protocol_send_bitfield(); if (session->allowsDHT() && io_->supports_dht()) { protocol_send_dht_port(session->udpPort()); } io_->set_callbacks(can_read, did_write, got_error, this); update_desired_request_count(); update_active(); } tr_peerMsgsImpl(tr_peerMsgsImpl&&) = delete; tr_peerMsgsImpl(tr_peerMsgsImpl const&) = delete; tr_peerMsgsImpl& operator=(tr_peerMsgsImpl&&) = delete; tr_peerMsgsImpl& operator=(tr_peerMsgsImpl const&) = delete; ~tr_peerMsgsImpl() override { set_active(TR_UP, false); set_active(TR_DOWN, false); if (io_) { io_->clear(); } } // --- [[nodiscard]] Speed get_piece_speed(uint64_t now, tr_direction dir) const override { return io_->get_piece_speed(now, dir); } [[nodiscard]] size_t active_req_count(tr_direction dir) const noexcept override { switch (dir) { case TR_CLIENT_TO_PEER: // requests we sent return tr_peerMgrCountActiveRequestsToPeer(&tor_, this); case TR_PEER_TO_CLIENT: // requests they sent return std::size(peer_requested_); default: TR_ASSERT(0); return {}; } } [[nodiscard]] tr_socket_address socket_address() const override { return io_->socket_address(); } [[nodiscard]] std::string display_name() const override { return socket_address().display_name(); } [[nodiscard]] tr_bitfield const& has() const noexcept override { return have_; } // --- void on_torrent_got_metainfo() noexcept override { update_active(); } void cancel_block_request(tr_block_index_t block) override { cancels_sent_to_peer.add(tr_time(), 1); protocol_send_cancel(peer_request::from_block(tor_, block)); } void set_choke(bool peer_is_choked) override { auto const now = tr_time(); auto const fibrillation_time = now - MinChokePeriodSec; if (choke_changed_at_ > fibrillation_time) { logtrace(this, fmt::format("Not changing choke to {} to avoid fibrillation", peer_is_choked)); } else if (this->peer_is_choked() != peer_is_choked) { set_peer_choked(peer_is_choked); // https://www.bittorrent.org/beps/bep_0006.html#reject-request // A peer SHOULD choke first and then reject requests so that // the peer receiving the choke does not re-request the pieces. protocol_send_choke(peer_is_choked); if (peer_is_choked) { reject_all_requests(); } choke_changed_at_ = now; update_active(TR_CLIENT_TO_PEER); } } void pulse() override; void on_piece_completed(tr_piece_index_t piece) override { protocol_send_have(piece); // since we have more pieces now, we might not be interested in this peer update_interest(); } void set_interested(bool interested) override { if (client_is_interested() != interested) { set_client_interested(interested); protocol_send_interest(interested); update_active(TR_PEER_TO_CLIENT); } } // --- void request_blocks(tr_block_span_t const* block_spans, size_t n_spans) override { TR_ASSERT(tor_.client_can_download()); TR_ASSERT(client_is_interested()); TR_ASSERT(!client_is_choked()); for (auto const *span = block_spans, *span_end = span + n_spans; span != span_end; ++span) { for (auto [block, block_end] = *span; block < block_end; ++block) { // Note that requests can't cross over a piece boundary. // So if a piece isn't evenly divisible by the block size, // we need to split our block request info per-piece chunks. auto const byte_begin = tor_.block_loc(block).byte; auto const block_size = tor_.block_size(block); auto const byte_end = byte_begin + block_size; for (auto offset = byte_begin; offset < byte_end;) { auto const loc = tor_.byte_loc(offset); auto const left_in_block = block_size - loc.block_offset; auto const left_in_piece = tor_.piece_size(loc.piece) - loc.piece_offset; auto const req_len = std::min(left_in_block, left_in_piece); protocol_send_request({ loc.piece, loc.piece_offset, req_len }); offset += req_len; } } tr_peerMgrClientSentRequests(&tor_, this, *span); } } void update_active() { update_active(TR_UP); update_active(TR_DOWN); } void update_active(tr_direction direction) { TR_ASSERT(tr_isDirection(direction)); set_active(direction, calculate_active(direction)); } [[nodiscard]] bool calculate_active(tr_direction direction) const { if (direction == TR_CLIENT_TO_PEER) { return peer_is_interested() && !peer_is_choked(); } // TR_PEER_TO_CLIENT if (!tor_.has_metainfo()) { return true; } auto const active = client_is_interested() && !client_is_choked(); TR_ASSERT(!active || !tor_.is_done()); return active; } private: // --- void update_interest() { // TODO(ckerr) -- might need to poke the mgr on startup // additional note (tearfur) // by "poke the mgr", Charles probably meant calling isPeerInteresting(), // then pass the result to set_interesting() } // --- [[nodiscard]] bool is_valid_request(peer_request const& req) const; void reject_all_requests() { auto& queue = peer_requested_; if (auto const must_send_rej = io_->supports_fext(); must_send_rej) { std::for_each(std::begin(queue), std::end(queue), [this](peer_request const& req) { protocol_send_reject(req); }); } queue.clear(); } [[nodiscard]] bool can_add_request_from_peer(peer_request const& req); void on_peer_made_request(peer_request const& req) { if (can_add_request_from_peer(req)) { peer_requested_.emplace_back(req); } else if (io_->supports_fext()) { protocol_send_reject(req); } } // how many blocks could we request from this peer right now? [[nodiscard]] size_t max_available_reqs() const; void update_desired_request_count() { desired_request_count_ = max_available_reqs(); } void update_block_requests(); // --- [[nodiscard]] std::optional pop_next_metadata_request() { auto& reqs = peer_requested_metadata_pieces_; if (std::empty(reqs)) { return {}; } auto next = reqs.front(); reqs.pop(); return next; } void update_metadata_requests(time_t now) const; [[nodiscard]] size_t add_next_metadata_piece(); [[nodiscard]] size_t add_next_block(time_t now_sec, uint64_t now_msec); [[nodiscard]] size_t fill_output_buffer(time_t now_sec, uint64_t now_msec); // --- void send_ltep_handshake(); void parse_ltep_handshake(MessageReader& payload); void parse_ut_metadata(MessageReader& payload_in); void parse_ut_pex(MessageReader& payload); void parse_ltep(MessageReader& payload); void send_ut_pex(); int client_got_block(std::unique_ptr block_data, tr_block_index_t block); ReadResult read_piece_data(MessageReader& payload); ReadResult process_peer_message(uint8_t id, MessageReader& payload); // --- size_t protocol_send_keepalive() const; // NOLINT(modernize-use-nodiscard) template size_t protocol_send_message(uint8_t type, Args const&... args) const; size_t protocol_send_reject(peer_request const& req) const // NOLINT(modernize-use-nodiscard) { TR_ASSERT(io_->supports_fext()); return protocol_send_message(BtPeerMsgs::FextReject, req.index, req.offset, req.length); } size_t protocol_send_cancel(peer_request const& req) const // NOLINT(modernize-use-nodiscard) { return protocol_send_message(BtPeerMsgs::Cancel, req.index, req.offset, req.length); } size_t protocol_send_request(peer_request const& req) const // NOLINT(modernize-use-nodiscard) { TR_ASSERT(is_valid_request(req)); return protocol_send_message(BtPeerMsgs::Request, req.index, req.offset, req.length); } size_t protocol_send_dht_port(tr_port const port) const // NOLINT(modernize-use-nodiscard) { return protocol_send_message(BtPeerMsgs::DhtPort, port.host()); } size_t protocol_send_have(tr_piece_index_t const index) const // NOLINT(modernize-use-nodiscard) { static_assert(sizeof(tr_piece_index_t) == sizeof(uint32_t)); return protocol_send_message(BtPeerMsgs::Have, index); } size_t protocol_send_choke(bool const choke) const // NOLINT(modernize-use-nodiscard) { return protocol_send_message(choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke); } void protocol_send_interest(bool const b) const { protocol_send_message(b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested); } void protocol_send_bitfield(); // --- void publish(tr_peer_event const& peer_event) { if (callback_ != nullptr) { (*callback_)(this, peer_event, callback_data_); } } // --- static void did_write(tr_peerIo* /*io*/, size_t bytes_written, bool was_piece_data, void* vmsgs); static ReadState can_read(tr_peerIo* io, void* vmsgs, size_t* piece); static void got_error(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs); // --- bool peer_supports_pex_ = false; bool peer_supports_metadata_xfer_ = false; bool client_sent_ltep_handshake_ = false; size_t desired_request_count_ = 0; uint8_t ut_pex_id_ = 0; uint8_t ut_metadata_id_ = 0; tr_port dht_port_; EncryptionPreference encryption_preference_ = EncryptionPreference::Unknown; tr_torrent& tor_; std::shared_ptr const io_; std::deque peer_requested_; std::array, NUM_TR_AF_INET_TYPES> pex_; std::queue peer_requested_metadata_pieces_; time_t client_sent_at_ = 0; time_t choke_changed_at_ = 0; tr_incoming incoming_ = {}; // if the peer supports the Extension Protocol in BEP 10 and // supplied a reqq argument, it's stored here. std::optional reqq_; std::unique_ptr pex_timer_; tr_bitfield have_; tr_peer_callback_bt const callback_; void* const callback_data_; // seconds between periodic send_ut_pex() calls static auto constexpr SendPexInterval = 90s; }; // --- [[nodiscard]] constexpr bool is_message_length_correct(tr_torrent const& tor, uint8_t id, uint32_t len) { switch (id) { case BtPeerMsgs::Choke: case BtPeerMsgs::Unchoke: case BtPeerMsgs::Interested: case BtPeerMsgs::NotInterested: case BtPeerMsgs::FextHaveAll: case BtPeerMsgs::FextHaveNone: return len == 1U; case BtPeerMsgs::Have: case BtPeerMsgs::FextSuggest: case BtPeerMsgs::FextAllowedFast: return len == 5U; case BtPeerMsgs::Bitfield: return !tor.has_metainfo() || len == 1 + ((tor.piece_count() + 7U) / 8U); case BtPeerMsgs::Request: case BtPeerMsgs::Cancel: case BtPeerMsgs::FextReject: return len == 13U; case BtPeerMsgs::Piece: len -= sizeof(id) + sizeof(uint32_t /*piece*/) + sizeof(uint32_t /*offset*/); return len <= tr_block_info::BlockSize; case BtPeerMsgs::DhtPort: return len == 3U; case BtPeerMsgs::Ltep: return len >= 2U; default: // unrecognized message return false; } } namespace protocol_send_message_helpers { [[nodiscard]] constexpr auto get_param_length(uint8_t param) noexcept { return sizeof(param); } [[nodiscard]] constexpr auto get_param_length(uint16_t param) noexcept { return sizeof(param); } [[nodiscard]] constexpr auto get_param_length(uint32_t param) noexcept { return sizeof(param); } template [[nodiscard]] TR_CONSTEXPR20 auto get_param_length(T const& param) noexcept { return std::size(param); } // --- void add_param(MessageWriter& buffer, uint8_t param) noexcept { buffer.add_uint8(param); } void add_param(MessageWriter& buffer, uint16_t param) noexcept { buffer.add_uint16(param); } void add_param(MessageWriter& buffer, uint32_t param) noexcept { buffer.add_uint32(param); } template void add_param(MessageWriter& buffer, T const& param) noexcept { buffer.add(param); } // --- [[nodiscard]] std::string log_param(uint8_t param) { return fmt::format(" {:d}", static_cast(param)); } [[nodiscard]] std::string log_param(uint16_t param) { return fmt::format(" {:d}", static_cast(param)); } [[nodiscard]] std::string log_param(uint32_t param) { return fmt::format(" {:d}", static_cast(param)); } template [[nodiscard]] std::string log_param(T const& /*unused*/) { return " []"; } template [[nodiscard]] std::string build_log_message(uint8_t type, Args const&... args) { auto text = fmt::format("sending '{:s}'", BtPeerMsgs::debug_name(type)); (text.append(log_param(args)), ...); return text; } template size_t build_peer_message(MessageWriter& out, uint8_t type, Args const&... args) { auto msg_len = sizeof(type); ((msg_len += get_param_length(args)), ...); out.add_uint32(msg_len); out.add_uint8(type); (add_param(out, args), ...); return msg_len; } } // namespace protocol_send_message_helpers template size_t tr_peerMsgsImpl::protocol_send_message(uint8_t type, Args const&... args) const { using namespace protocol_send_message_helpers; logtrace(this, build_log_message(type, args...)); auto out = MessageBuffer{}; [[maybe_unused]] auto const msg_len = build_peer_message(out, type, args...); TR_ASSERT(is_message_length_correct(tor_, type, msg_len)); auto const n_bytes_added = std::size(out); io_->write(out, type == BtPeerMsgs::Piece); return n_bytes_added; } void tr_peerMsgsImpl::protocol_send_bitfield() { bool const fext = io_->supports_fext(); if (fext && tor_.has_all()) { protocol_send_message(BtPeerMsgs::FextHaveAll); } else if (fext && tor_.has_none()) { protocol_send_message(BtPeerMsgs::FextHaveNone); } else if (!tor_.has_none()) { // https://www.bittorrent.org/beps/bep_0003.html#peer-messages // Downloaders which don't have anything yet may skip the 'bitfield' message. protocol_send_message(BtPeerMsgs::Bitfield, tor_.create_piece_bitfield()); } } size_t tr_peerMsgsImpl::protocol_send_keepalive() const { logtrace(this, "sending 'keepalive'"); auto out = MessageBuffer{}; out.add_uint32(0); auto const n_bytes_added = std::size(out); io_->write(out, false); return n_bytes_added; } // --- void tr_peerMsgsImpl::parse_ltep(MessageReader& payload) { TR_ASSERT(!std::empty(payload)); auto const ltep_msgid = payload.to_uint8(); if (ltep_msgid == LtepMessages::Handshake) { logtrace(this, "got ltep handshake"); parse_ltep_handshake(payload); if (io_->supports_ltep()) { send_ltep_handshake(); send_ut_pex(); } } else if (ltep_msgid == UT_PEX_ID) { logtrace(this, "got ut pex"); peer_supports_pex_ = true; parse_ut_pex(payload); } else if (ltep_msgid == UT_METADATA_ID) { logtrace(this, "got ut metadata"); peer_supports_metadata_xfer_ = true; parse_ut_metadata(payload); } else { logtrace(this, fmt::format("skipping unknown ltep message ({:d})", static_cast(ltep_msgid))); } } void tr_peerMsgsImpl::parse_ut_pex(MessageReader& payload) { if (!tor_.allows_pex()) { return; } if (auto var = tr_variant_serde::benc().inplace().parse(payload.to_string_view()); var) { uint8_t const* added = nullptr; auto added_len = size_t{}; if (tr_variantDictFindRaw(&*var, TR_KEY_added, &added, &added_len)) { uint8_t const* added_f = nullptr; auto added_f_len = size_t{}; if (!tr_variantDictFindRaw(&*var, TR_KEY_added_f, &added_f, &added_f_len)) { added_f_len = 0; added_f = nullptr; } auto pex = tr_pex::from_compact_ipv4(added, added_len, added_f, added_f_len); pex.resize(std::min(MaxPexPeerCount, std::size(pex))); tr_peerMgrAddPex(&tor_, TR_PEER_FROM_PEX, std::data(pex), std::size(pex)); } if (tr_variantDictFindRaw(&*var, TR_KEY_added6, &added, &added_len)) { uint8_t const* added_f = nullptr; auto added_f_len = size_t{}; if (!tr_variantDictFindRaw(&*var, TR_KEY_added6_f, &added_f, &added_f_len)) { added_f_len = 0; added_f = nullptr; } auto pex = tr_pex::from_compact_ipv6(added, added_len, added_f, added_f_len); pex.resize(std::min(MaxPexPeerCount, std::size(pex))); tr_peerMgrAddPex(&tor_, TR_PEER_FROM_PEX, std::data(pex), std::size(pex)); } } } void tr_peerMsgsImpl::send_ut_pex() { // only send pex if both the torrent and peer support it if (!peer_supports_pex_ || !tor_.allows_pex()) { return; } static auto constexpr MaxPexAdded = size_t{ 50U }; static auto constexpr MaxPexDropped = size_t{ 50U }; auto map = tr_variant::Map{ 4U }; auto tmpbuf = small::vector{}; for (uint8_t i = 0; i < NUM_TR_AF_INET_TYPES; ++i) { static auto constexpr AddedMap = std::array{ TR_KEY_added, TR_KEY_added6 }; static auto constexpr AddedFMap = std::array{ TR_KEY_added_f, TR_KEY_added6_f }; static auto constexpr DroppedMap = std::array{ TR_KEY_dropped, TR_KEY_dropped6 }; auto const ip_type = static_cast(i); auto& old_pex = pex_[i]; auto new_pex = tr_peerMgrGetPeers(&tor_, ip_type, TR_PEERS_CONNECTED, MaxPexPeerCount); auto added = std::vector{}; added.reserve(std::size(new_pex)); std::set_difference( std::begin(new_pex), std::end(new_pex), std::begin(old_pex), std::end(old_pex), std::back_inserter(added)); auto dropped = std::vector{}; dropped.reserve(std::size(old_pex)); std::set_difference( std::begin(old_pex), std::end(old_pex), std::begin(new_pex), std::end(new_pex), std::back_inserter(dropped)); // Some peers give us error messages if we send // more than this many peers in a single pex message. // https://wiki.theory.org/BitTorrentPeerExchangeConventions added.resize(std::min(std::size(added), MaxPexAdded)); dropped.resize(std::min(std::size(dropped), MaxPexDropped)); logtrace( this, fmt::format( "pex: old {:s} peer count {:d}, new peer count {:d}, added {:d}, dropped {:d}", tr_ip_protocol_to_sv(ip_type), std::size(old_pex), std::size(new_pex), std::size(added), std::size(dropped))); // if there's nothing to send, then we're done if (std::empty(added) && std::empty(dropped)) { continue; } // update msgs std::swap(old_pex, new_pex); // build the pex payload if (!std::empty(added)) { // "added" tmpbuf.clear(); tmpbuf.reserve(std::size(added) * tr_socket_address::CompactSockAddrBytes[i]); tr_pex::to_compact(std::back_inserter(tmpbuf), std::data(added), std::size(added)); TR_ASSERT(std::size(tmpbuf) == std::size(added) * tr_socket_address::CompactSockAddrBytes[i]); map.try_emplace(AddedMap[i], std::string_view{ reinterpret_cast(std::data(tmpbuf)), std::size(tmpbuf) }); // "added.f" tmpbuf.resize(std::size(added)); auto* begin = std::data(tmpbuf); auto* walk = begin; for (auto const& p : added) { *walk++ = std::byte{ p.flags }; } auto const f_len = static_cast(walk - begin); TR_ASSERT(f_len == std::size(added)); map.try_emplace(AddedFMap[i], std::string_view{ reinterpret_cast(begin), f_len }); } if (!std::empty(dropped)) { // "dropped" tmpbuf.clear(); tmpbuf.reserve(std::size(dropped) * tr_socket_address::CompactSockAddrBytes[i]); tr_pex::to_compact(std::back_inserter(tmpbuf), std::data(dropped), std::size(dropped)); TR_ASSERT(std::size(tmpbuf) == std::size(dropped) * tr_socket_address::CompactSockAddrBytes[i]); map.try_emplace(DroppedMap[i], std::string_view{ reinterpret_cast(std::data(tmpbuf)), std::size(tmpbuf) }); } } protocol_send_message(BtPeerMsgs::Ltep, ut_pex_id_, tr_variant_serde::benc().to_string(tr_variant{ std::move(map) })); } void tr_peerMsgsImpl::send_ltep_handshake() { if (client_sent_ltep_handshake_) { return; } logtrace(this, "sending an ltep handshake"); client_sent_ltep_handshake_ = true; /* decide if we want to advertise metadata xfer support (BEP 9) */ bool const allow_metadata_xfer = tor_.is_public(); /* decide if we want to advertise pex support */ bool const allow_pex = tor_.allows_pex(); auto val = tr_variant{}; tr_variantInitDict(&val, 8); tr_variantDictAddBool(&val, TR_KEY_e, session->encryptionMode() != TR_CLEAR_PREFERRED); // If connecting to global peer, then use global address // Otherwise we are connecting to local peer, use bind address directly if (auto const addr = io_->address().is_global_unicast_address() ? session->global_address(TR_AF_INET) : session->bind_address(TR_AF_INET); addr && !addr->is_any()) { TR_ASSERT(addr->is_ipv4()); tr_variantDictAddRaw(&val, TR_KEY_ipv4, &addr->addr.addr4, sizeof(addr->addr.addr4)); } if (auto const addr = io_->address().is_global_unicast_address() ? session->global_address(TR_AF_INET6) : session->bind_address(TR_AF_INET6); addr && !addr->is_any()) { TR_ASSERT(addr->is_ipv6()); tr_variantDictAddRaw(&val, TR_KEY_ipv6, &addr->addr.addr6, sizeof(addr->addr.addr6)); } // https://www.bittorrent.org/beps/bep_0009.html // It also adds "metadata_size" to the handshake message (not the // "m" dictionary) specifying an integer value of the number of // bytes of the metadata. if (auto const info_dict_size = tor_.info_dict_size(); allow_metadata_xfer && tor_.has_metainfo() && info_dict_size > 0) { tr_variantDictAddInt(&val, TR_KEY_metadata_size, info_dict_size); } // https://www.bittorrent.org/beps/bep_0010.html // Local TCP listen port. Allows each side to learn about the TCP // port number of the other side. Note that there is no need for the // receiving side of the connection to send this extension message, // since its port number is already known. tr_variantDictAddInt(&val, TR_KEY_p, session->advertisedPeerPort().host()); // https://www.bittorrent.org/beps/bep_0010.html // An integer, the number of outstanding request messages this // client supports without dropping any. The default in in // libtorrent is 250. tr_variantDictAddInt(&val, TR_KEY_reqq, ReqQ); // https://www.bittorrent.org/beps/bep_0010.html // A string containing the compact representation of the ip address this peer sees // you as. i.e. this is the receiver's external ip address (no port is included). // This may be either an IPv4 (4 bytes) or an IPv6 (16 bytes) address. { auto buf = std::array{}; auto const begin = std::data(buf); auto const end = io_->address().to_compact(begin); auto const len = end - begin; TR_ASSERT(len == tr_address::CompactAddrBytes[0] || len == tr_address::CompactAddrBytes[1]); tr_variantDictAddRaw(&val, TR_KEY_yourip, begin, len); } // https://www.bittorrent.org/beps/bep_0010.html // Client name and version (as a utf-8 string). This is a much more // reliable way of identifying the client than relying on the // peer id encoding. tr_variantDictAddStrView(&val, TR_KEY_v, TR_NAME " " USERAGENT_PREFIX); // https://www.bittorrent.org/beps/bep_0021.html // A peer that is a partial seed SHOULD include an extra header in // the extension handshake 'upload_only'. Setting the value of this // key to 1 indicates that this peer is not interested in downloading // anything. tr_variantDictAddBool(&val, TR_KEY_upload_only, tor_.is_done()); if (allow_metadata_xfer || allow_pex) { tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2); if (allow_metadata_xfer) { tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID); } if (allow_pex) { tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID); } } protocol_send_message(BtPeerMsgs::Ltep, LtepMessages::Handshake, tr_variant_serde::benc().to_string(val)); } void tr_peerMsgsImpl::parse_ltep_handshake(MessageReader& payload) { auto const handshake_sv = payload.to_string_view(); auto var = tr_variant_serde::benc().inplace().parse(handshake_sv); if (!var || !var->holds_alternative()) { logtrace(this, "GET extended-handshake, couldn't get dictionary"); return; } logtrace(this, fmt::format("here is the base64-encoded handshake: [{:s}]", tr_base64_encode(handshake_sv))); // does the peer prefer encrypted connections? auto pex = tr_pex{}; auto& [addr, port] = pex.socket_address; if (auto e = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_e, &e)) { encryption_preference_ = e != 0 ? EncryptionPreference::Yes : EncryptionPreference::No; if (encryption_preference_ == EncryptionPreference::Yes) { pex.flags |= ADDED_F_ENCRYPTION_FLAG; } } // check supported messages for utorrent pex peer_supports_pex_ = false; peer_supports_metadata_xfer_ = false; if (tr_variant* sub = nullptr; tr_variantDictFindDict(&*var, TR_KEY_m, &sub)) { if (auto ut_pex = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_pex, &ut_pex)) { peer_supports_pex_ = ut_pex != 0; ut_pex_id_ = static_cast(ut_pex); logtrace(this, fmt::format("msgs->ut_pex is {:d}", ut_pex_id_)); } if (auto ut_metadata = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &ut_metadata)) { peer_supports_metadata_xfer_ = ut_metadata != 0; ut_metadata_id_ = static_cast(ut_metadata); logtrace(this, fmt::format("msgs->ut_metadata_id_ is {:d}", ut_metadata_id_)); } if (auto ut_holepunch = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &ut_holepunch)) { // Transmission doesn't support this extension yet. // But its presence does indicate µTP supports, // which we do care about... peer_info->set_utp_supported(true); } } // look for metainfo size (BEP 9) if (auto metadata_size = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_metadata_size, &metadata_size)) { if (!tr_metadata_download::is_valid_metadata_size(metadata_size)) { peer_supports_metadata_xfer_ = false; } else { tor_.maybe_start_metadata_transfer(metadata_size); } } // look for upload_only (BEP 21) if (auto upload_only = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_upload_only, &upload_only)) { pex.flags |= ADDED_F_SEED_FLAG; } // https://www.bittorrent.org/beps/bep_0010.html // Client name and version (as a utf-8 string). This is a much more // reliable way of identifying the client than relying on the // peer id encoding. if (auto sv = std::string_view{}; tr_variantDictFindStrView(&*var, TR_KEY_v, &sv)) { set_user_agent(tr_interned_string{ sv }); } /* get peer's listening port */ if (auto p = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_p, &p) && p > 0) { port.set_host(p); publish(tr_peer_event::GotPort(port)); logtrace(this, fmt::format("peer's port is now {:d}", p)); } std::byte const* addr_compact = nullptr; auto addr_len = size_t{}; if (io_->is_incoming() && tr_variantDictFindRaw(&*var, TR_KEY_ipv4, &addr_compact, &addr_len) && addr_len == tr_address::CompactAddrBytes[TR_AF_INET]) { std::tie(addr, std::ignore) = tr_address::from_compact_ipv4(addr_compact); tr_peerMgrAddPex(&tor_, TR_PEER_FROM_LTEP, &pex, 1); } if (io_->is_incoming() && tr_variantDictFindRaw(&*var, TR_KEY_ipv6, &addr_compact, &addr_len) && addr_len == tr_address::CompactAddrBytes[TR_AF_INET6]) { std::tie(addr, std::ignore) = tr_address::from_compact_ipv6(addr_compact); tr_peerMgrAddPex(&tor_, TR_PEER_FROM_LTEP, &pex, 1); } /* get peer's maximum request queue size */ if (auto reqq_in = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_reqq, &reqq_in)) { reqq_ = reqq_in; } } void tr_peerMsgsImpl::parse_ut_metadata(MessageReader& payload_in) { int64_t msg_type = -1; int64_t piece = -1; int64_t total_size = 0; auto const tmp = payload_in.to_string_view(); auto const* const msg_end = std::data(tmp) + std::size(tmp); auto serde = tr_variant_serde::benc(); if (auto var = serde.inplace().parse(tmp); var) { (void)tr_variantDictFindInt(&*var, TR_KEY_msg_type, &msg_type); (void)tr_variantDictFindInt(&*var, TR_KEY_piece, &piece); (void)tr_variantDictFindInt(&*var, TR_KEY_total_size, &total_size); } logtrace(this, fmt::format("got ut_metadata msg: type {:d}, piece {:d}, total_size {:d}", msg_type, piece, total_size)); if (msg_type == MetadataMsgType::Reject) { // no-op } if (auto const piece_len = msg_end - serde.end(); msg_type == MetadataMsgType::Data && piece * MetadataPieceSize + piece_len <= total_size) { tor_.set_metadata_piece(piece, serde.end(), piece_len); } if (msg_type == MetadataMsgType::Request) { if (piece >= 0 && tor_.has_metainfo() && tor_.is_public() && std::size(peer_requested_metadata_pieces_) < MetadataReqQ) { peer_requested_metadata_pieces_.push(piece); } else { /* send a rejection message */ auto v = tr_variant{}; tr_variantInitDict(&v, 2); tr_variantDictAddInt(&v, TR_KEY_msg_type, MetadataMsgType::Reject); tr_variantDictAddInt(&v, TR_KEY_piece, piece); protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, serde.to_string(v)); } } } // --- ReadResult tr_peerMsgsImpl::process_peer_message(uint8_t id, MessageReader& payload) { bool const fext = io_->supports_fext(); auto ui32 = uint32_t{}; logtrace( this, fmt::format( "got peer msg '{:s}' ({:d}) with payload len {:d}", BtPeerMsgs::debug_name(id), static_cast(id), std::size(payload))); if (!is_message_length_correct(tor_, id, sizeof(id) + std::size(payload))) { logdbg( this, fmt::format( "bad msg: '{:s}' ({:d}) with payload len {:d}", BtPeerMsgs::debug_name(id), static_cast(id), std::size(payload))); publish(tr_peer_event::GotError(EMSGSIZE)); return { READ_ERR, {} }; } switch (id) { case BtPeerMsgs::Choke: logtrace(this, "got Choke"); set_client_choked(true); if (!fext) { publish(tr_peer_event::GotChoke()); } update_active(TR_PEER_TO_CLIENT); break; case BtPeerMsgs::Unchoke: logtrace(this, "got Unchoke"); set_client_choked(false); update_active(TR_PEER_TO_CLIENT); update_desired_request_count(); break; case BtPeerMsgs::Interested: logtrace(this, "got Interested"); set_peer_interested(true); update_active(TR_CLIENT_TO_PEER); break; case BtPeerMsgs::NotInterested: logtrace(this, "got Not Interested"); set_peer_interested(false); update_active(TR_CLIENT_TO_PEER); break; case BtPeerMsgs::Have: ui32 = payload.to_uint32(); logtrace(this, fmt::format("got Have: {:d}", ui32)); if (tor_.has_metainfo() && ui32 >= tor_.piece_count()) { publish(tr_peer_event::GotError(ERANGE)); return { READ_ERR, {} }; } /* a peer can send the same HAVE message twice... */ if (!have_.test(ui32)) { have_.set(ui32); publish(tr_peer_event::GotHave(ui32)); } break; case BtPeerMsgs::Bitfield: logtrace(this, "got a bitfield"); have_ = tr_bitfield{ tor_.has_metainfo() ? tor_.piece_count() : std::size(payload) * 8 }; have_.set_raw(reinterpret_cast(std::data(payload)), std::size(payload)); publish(tr_peer_event::GotBitfield(&have_)); break; case BtPeerMsgs::Request: { struct peer_request r; r.index = payload.to_uint32(); r.offset = payload.to_uint32(); r.length = payload.to_uint32(); logtrace(this, fmt::format("got Request: {:d}:{:d}->{:d}", r.index, r.offset, r.length)); on_peer_made_request(r); break; } case BtPeerMsgs::Cancel: { struct peer_request r; r.index = payload.to_uint32(); r.offset = payload.to_uint32(); r.length = payload.to_uint32(); cancels_sent_to_client.add(tr_time(), 1); logtrace(this, fmt::format("got a Cancel {:d}:{:d}->{:d}", r.index, r.offset, r.length)); auto& requests = peer_requested_; if (auto iter = std::find(std::begin(requests), std::end(requests), r); iter != std::end(requests)) { requests.erase(iter); // bep6: "Even when a request is cancelled, the peer // receiving the cancel should respond with either the // corresponding reject or the corresponding piece" if (fext) { protocol_send_reject(r); } } break; } case BtPeerMsgs::Piece: return read_piece_data(payload); case BtPeerMsgs::DhtPort: // https://www.bittorrent.org/beps/bep_0005.html // Peers supporting the DHT set the last bit of the 8-byte reserved flags // exchanged in the BitTorrent protocol handshake. Peer receiving a handshake // indicating the remote peer supports the DHT should send a PORT message. // It begins with byte 0x09 and has a two byte payload containing the UDP // port of the DHT node in network byte order. { logtrace(this, "Got a BtPeerMsgs::DhtPort"); auto const hport = payload.to_uint16(); if (auto const dht_port = tr_port::from_host(hport); !std::empty(dht_port)) { dht_port_ = dht_port; session->maybe_add_dht_node(io_->address(), dht_port_); } } break; case BtPeerMsgs::FextSuggest: logtrace(this, "Got a BtPeerMsgs::FextSuggest"); if (fext) { auto const piece = payload.to_uint32(); publish(tr_peer_event::GotSuggest(piece)); } else { publish(tr_peer_event::GotError(EMSGSIZE)); return { READ_ERR, {} }; } break; case BtPeerMsgs::FextAllowedFast: logtrace(this, "Got a BtPeerMsgs::FextAllowedFast"); if (fext) { auto const piece = payload.to_uint32(); publish(tr_peer_event::GotAllowedFast(piece)); } else { publish(tr_peer_event::GotError(EMSGSIZE)); return { READ_ERR, {} }; } break; case BtPeerMsgs::FextHaveAll: logtrace(this, "Got a BtPeerMsgs::FextHaveAll"); if (fext) { have_.set_has_all(); publish(tr_peer_event::GotHaveAll()); } else { publish(tr_peer_event::GotError(EMSGSIZE)); return { READ_ERR, {} }; } break; case BtPeerMsgs::FextHaveNone: logtrace(this, "Got a BtPeerMsgs::FextHaveNone"); if (fext) { have_.set_has_none(); publish(tr_peer_event::GotHaveNone()); } else { publish(tr_peer_event::GotError(EMSGSIZE)); return { READ_ERR, {} }; } break; case BtPeerMsgs::FextReject: { struct peer_request r; r.index = payload.to_uint32(); r.offset = payload.to_uint32(); r.length = payload.to_uint32(); if (fext) { publish(tr_peer_event::GotRejected(tor_.block_info(), tor_.piece_loc(r.index, r.offset).block)); } else { publish(tr_peer_event::GotError(EMSGSIZE)); return { READ_ERR, {} }; } break; } case BtPeerMsgs::Ltep: logtrace(this, "Got a BtPeerMsgs::Ltep"); parse_ltep(payload); break; default: logtrace(this, fmt::format("peer sent us an UNKNOWN: {:d}", static_cast(id))); break; } return { READ_NOW, {} }; } ReadResult tr_peerMsgsImpl::read_piece_data(MessageReader& payload) { // auto const piece = payload.to_uint32(); auto const offset = payload.to_uint32(); auto const len = std::size(payload); auto const loc = tor_.piece_loc(piece, offset); auto const block = loc.block; auto const block_size = tor_.block_size(block); logtrace(this, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", len, piece, offset, len)); if (loc.block_offset + len > block_size) { logwarn(this, fmt::format("got unaligned piece {:d}:{:d}->{:d}", piece, offset, len)); return { READ_ERR, len }; } if (!tr_peerMgrDidPeerRequest(&tor_, this, block)) { logwarn(this, fmt::format("got unrequested piece {:d}:{:d}->{:d}", piece, offset, len)); return { READ_ERR, len }; } publish(tr_peer_event::GotPieceData(len)); if (loc.block_offset == 0U && len == block_size) // simple case: one message has entire block { auto buf = std::make_unique(block_size); payload.to_buf(std::data(*buf), len); auto const ok = client_got_block(std::move(buf), block) == 0; return { ok ? READ_NOW : READ_ERR, len }; } auto& blocks = incoming_.blocks; auto& incoming_block = blocks.try_emplace(block, block_size).first->second; payload.to_buf(std::data(*incoming_block.buf) + loc.block_offset, len); if (!incoming_block.add_span(loc.block_offset, loc.block_offset + len)) { return { READ_ERR, len }; // invalid span } if (!incoming_block.has_all()) { return { READ_LATER, len }; // we don't have the full block yet } auto block_buf = std::move(incoming_block.buf); blocks.erase(block); // note: invalidates `incoming_block` local auto const ok = client_got_block(std::move(block_buf), block) == 0; return { ok ? READ_NOW : READ_ERR, len }; } // returns 0 on success, or an errno on failure int tr_peerMsgsImpl::client_got_block(std::unique_ptr block_data, tr_block_index_t const block) { auto const n_expected = tor_.block_size(block); if (!block_data) { logdbg(this, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, 0)); return EMSGSIZE; } if (std::size(*block_data) != tor_.block_size(block)) { logdbg(this, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, std::size(*block_data))); return EMSGSIZE; } logtrace(this, fmt::format("got block {:d}", block)); if (!tr_peerMgrDidPeerRequest(&tor_, this, block)) { logdbg(this, "we didn't ask for this message..."); return 0; } auto const loc = tor_.block_loc(block); if (tor_.has_piece(loc.piece)) { logtrace(this, "we did ask for this message, but the piece is already complete..."); return 0; } // NB: if writeBlock() fails the torrent may be paused. // If this happens, this object will be destructed and must no longer be used. if (auto const err = session->cache->write_block(tor_.id(), block, std::move(block_data)); err != 0) { return err; } blame.set(loc.piece); publish(tr_peer_event::GotBlock(tor_.block_info(), block)); return 0; } // --- void tr_peerMsgsImpl::did_write(tr_peerIo* /*io*/, size_t bytes_written, bool was_piece_data, void* vmsgs) { auto* const msgs = static_cast(vmsgs); if (was_piece_data) { msgs->publish(tr_peer_event::SentPieceData(bytes_written)); } msgs->pulse(); } ReadState tr_peerMsgsImpl::can_read(tr_peerIo* io, void* vmsgs, size_t* piece) { auto* const msgs = static_cast(vmsgs); // https://www.bittorrent.org/beps/bep_0003.html // Next comes an alternating stream of length prefixes and messages. // Messages of length zero are keepalives, and ignored. // All non-keepalive messages start with a single byte which gives their type. // // https://wiki.theory.org/BitTorrentSpecification // All of the remaining messages in the protocol take the form of // . The length prefix is a four byte // big-endian value. The message ID is a single decimal byte. // The payload is message dependent. // read auto& current_message_len = msgs->incoming_.length; // the full message payload length. Includes the +1 for id length if (!current_message_len) { auto message_len = uint32_t{}; if (io->read_buffer_size() < sizeof(message_len)) { return READ_LATER; } io->read_uint32(&message_len); // The keep-alive message is a message with zero bytes, // specified with the length prefix set to zero. // There is no message ID and no payload. if (message_len == 0U) { logtrace(msgs, "got KeepAlive"); return READ_NOW; } current_message_len = message_len; } // read auto& current_message_type = msgs->incoming_.id; if (!current_message_type) { auto message_type = uint8_t{}; if (io->read_buffer_size() < sizeof(message_type)) { return READ_LATER; } io->read_uint8(&message_type); current_message_type = message_type; } // read auto& current_payload = msgs->incoming_.payload; auto const full_payload_len = *current_message_len - sizeof(*current_message_type); auto n_left = full_payload_len - std::size(current_payload); auto const [buf, n_this_pass] = current_payload.reserve_space(std::min(n_left, io->read_buffer_size())); io->read_bytes(buf, n_this_pass); current_payload.commit_space(n_this_pass); n_left -= n_this_pass; logtrace(msgs, fmt::format("read {:d} payload bytes; {:d} left to go", n_this_pass, n_left)); if (n_left > 0U) { return READ_LATER; } // The incoming message is now complete. After processing the message // with `process_peer_message()`, reset the peerMsgs' incoming // field so it's ready to receive the next message. auto const [read_state, n_piece_bytes_read] = msgs->process_peer_message(*current_message_type, current_payload); *piece = n_piece_bytes_read; current_message_len.reset(); current_message_type.reset(); current_payload.clear(); return read_state; } void tr_peerMsgsImpl::got_error(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs) { static_cast(vmsgs)->publish(tr_peer_event::GotError(ENOTCONN)); } // --- void tr_peerMsgsImpl::pulse() { auto const now_sec = tr_time(); auto const now_msec = tr_time_msec(); update_desired_request_count(); update_block_requests(); update_metadata_requests(now_sec); for (;;) { if (fill_output_buffer(now_sec, now_msec) == 0U) { break; } } } void tr_peerMsgsImpl::update_metadata_requests(time_t now) const { if (!peer_supports_metadata_xfer_) { return; } if (auto const piece = tor_.get_next_metadata_request(now); piece) { auto tmp = tr_variant{}; tr_variantInitDict(&tmp, 3); tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Request); tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, tr_variant_serde::benc().to_string(tmp)); } } void tr_peerMsgsImpl::update_block_requests() { if (!tor_.client_can_download()) { return; } auto const n_active = tr_peerMgrCountActiveRequestsToPeer(&tor_, this); if (n_active >= desired_request_count_) { return; } TR_ASSERT(client_is_interested()); TR_ASSERT(!client_is_choked()); auto const n_wanted = desired_request_count_ - n_active; if (auto const requests = tr_peerMgrGetNextRequests(&tor_, this, n_wanted); !std::empty(requests)) { request_blocks(std::data(requests), std::size(requests)); } } [[nodiscard]] size_t tr_peerMsgsImpl::fill_output_buffer(time_t now_sec, uint64_t now_msec) { auto n_bytes_written = size_t{}; // fulfill metadata requests for (;;) { auto const old_len = n_bytes_written; n_bytes_written += add_next_metadata_piece(); if (old_len == n_bytes_written) { break; } } // fulfill piece requests for (;;) { auto const old_len = n_bytes_written; n_bytes_written += add_next_block(now_sec, now_msec); if (old_len == n_bytes_written) { break; } } if (client_sent_at_ != 0 && now_sec - client_sent_at_ > KeepaliveIntervalSecs) { n_bytes_written += protocol_send_keepalive(); } return n_bytes_written; } [[nodiscard]] size_t tr_peerMsgsImpl::add_next_metadata_piece() { auto const piece = pop_next_metadata_request(); if (!piece.has_value()) // no pending requests { return {}; } auto data = tor_.get_metadata_piece(*piece); if (!data) { // send a reject auto tmp = tr_variant{}; tr_variantInitDict(&tmp, 2); tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Reject); tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); return protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, tr_variant_serde::benc().to_string(tmp)); } // send the metadata auto tmp = tr_variant{}; tr_variantInitDict(&tmp, 3); tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Data); tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); tr_variantDictAddInt(&tmp, TR_KEY_total_size, tor_.info_dict_size()); return protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, tr_variant_serde::benc().to_string(tmp), *data); } [[nodiscard]] size_t tr_peerMsgsImpl::add_next_block(time_t now_sec, uint64_t now_msec) { if (std::empty(peer_requested_) || io_->get_write_buffer_space(now_msec) == 0U) { return {}; } auto const req = peer_requested_.front(); peer_requested_.pop_front(); auto buf = std::array{}; auto ok = is_valid_request(req) && tor_.has_piece(req.index); if (ok) { ok = tor_.ensure_piece_is_checked(req.index); if (!ok) { tor_.error().set_local_error(fmt::format("Please Verify Local Data! Piece #{:d} is corrupt.", req.index)); } } if (ok) { ok = session->cache->read_block(tor_, tor_.piece_loc(req.index, req.offset), req.length, std::data(buf)) == 0; } if (ok) { blocks_sent_to_peer.add(now_sec, 1); auto const piece_data = std::string_view{ reinterpret_cast(std::data(buf)), req.length }; return protocol_send_message(BtPeerMsgs::Piece, req.index, req.offset, piece_data); } if (io_->supports_fext()) { return protocol_send_reject(req); } return {}; } // --- bool tr_peerMsgsImpl::is_valid_request(peer_request const& req) const { int err = 0; if (req.index >= tor_.piece_count()) { err = 1; } else if (req.length < 1) { err = 2; } else if (req.offset + req.length > tor_.piece_size(req.index)) { err = 3; } else if (req.length > tr_block_info::BlockSize) { err = 4; } else if (tor_.piece_loc(req.index, req.offset, req.length).byte > tor_.total_size()) { err = 5; } if (err != 0) { tr_logAddTraceTor(&tor_, fmt::format("index {} offset {} length {} err {}", req.index, req.offset, req.length, err)); } return err == 0; } [[nodiscard]] bool tr_peerMsgsImpl::can_add_request_from_peer(peer_request const& req) { if (peer_is_choked()) { logtrace(this, "rejecting request from choked peer"); return false; } if (std::size(peer_requested_) >= ReqQ) { logtrace(this, "rejecting request ... reqq is full"); return false; } if (!is_valid_request(req)) { logtrace(this, "rejecting an invalid request."); return false; } if (!tor_.has_piece(req.index)) { logtrace(this, "rejecting request for a piece we don't have."); return false; } return true; } size_t tr_peerMsgsImpl::max_available_reqs() const { if (tor_.is_done() || !tor_.has_metainfo() || client_is_choked() || !client_is_interested()) { return 0; } // Get the rate limit we should use. // TODO: this needs to consider all the other peers as well... uint64_t const now = tr_time_msec(); auto rate = get_piece_speed(now, TR_PEER_TO_CLIENT); if (tor_.uses_speed_limit(TR_PEER_TO_CLIENT)) { rate = std::min(rate, tor_.speed_limit(TR_PEER_TO_CLIENT)); } // honor the session limits, if enabled if (tor_.uses_session_limits()) { if (auto const limit = session->active_speed_limit(TR_PEER_TO_CLIENT)) { rate = std::min(rate, *limit); } } // use this desired rate to figure out how // many requests we should send to this peer static auto constexpr Floor = size_t{ 32 }; static size_t constexpr Seconds = RequestBufSecs; size_t const estimated_blocks_in_period = (rate.base_quantity() * Seconds) / tr_block_info::BlockSize; auto const ceil = reqq_.value_or(250); return std::clamp(estimated_blocks_in_period, Floor, ceil); } } // namespace tr_peerMsgs::tr_peerMsgs( tr_torrent const& tor, std::shared_ptr peer_info_in, tr_interned_string user_agent, bool connection_is_encrypted, bool connection_is_incoming, bool connection_is_utp) : tr_peer{ tor } , peer_info{ std::move(peer_info_in) } , user_agent_{ user_agent } , connection_is_encrypted_{ connection_is_encrypted } , connection_is_incoming_{ connection_is_incoming } , connection_is_utp_{ connection_is_utp } { peer_info->set_connected(tr_time()); ++n_peers; } tr_peerMsgs::~tr_peerMsgs() { peer_info->set_connected(tr_time(), false); TR_ASSERT(n_peers > 0U); --n_peers; } tr_peerMsgs* tr_peerMsgs::create( tr_torrent& torrent, std::shared_ptr peer_info, std::shared_ptr io, tr_interned_string user_agent, tr_peer_callback_bt callback, void* callback_data) { return new tr_peerMsgsImpl{ torrent, std::move(peer_info), std::move(io), user_agent, callback, callback_data }; }