diff --git a/libtransmission/handshake.cc b/libtransmission/handshake.cc index 0753136c6..82720697b 100644 --- a/libtransmission/handshake.cc +++ b/libtransmission/handshake.cc @@ -561,7 +561,7 @@ ReadState tr_handshake::can_read(tr_peerIo* peer_io, void* vhandshake, size_t* p /* no piece data in handshake */ *piece = 0; - tr_logAddTraceHand(handshake, fmt::format("handling canRead; state is [{}]", handshake->state_string())); + tr_logAddTraceHand(handshake, fmt::format("handling can_read; state is [{}]", handshake->state_string())); ReadState ret = READ_NOW; while (ret == READ_NOW) diff --git a/libtransmission/magnet-metainfo.cc b/libtransmission/magnet-metainfo.cc index 3cb783aae..7086f7f23 100644 --- a/libtransmission/magnet-metainfo.cc +++ b/libtransmission/magnet-metainfo.cc @@ -134,7 +134,7 @@ std::optional parseBase32Hash(std::string_view sv) std::optional parseHash(std::string_view sv) { - // http://bittorrent.org/beps/bep_0009.html + // https://www.bittorrent.org/beps/bep_0009.html // Is the info-hash hex encoded, for a total of 40 characters. // For compatibility with existing links in the wild, clients // should also support the 32 character base32 encoded info-hash. @@ -153,7 +153,7 @@ std::optional parseHash(std::string_view sv) std::optional parseHash2(std::string_view sv) { - // http://bittorrent.org/beps/bep_0009.html + // https://www.bittorrent.org/beps/bep_0009.html // Is the info-hash v2 hex encoded and tag removed, for a total of 64 characters. if (auto const hash = tr_sha256_from_string(sv); hash) diff --git a/libtransmission/net.cc b/libtransmission/net.cc index b248f4b02..295c3dfde 100644 --- a/libtransmission/net.cc +++ b/libtransmission/net.cc @@ -271,7 +271,6 @@ tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_socket_address const return {}; } - auto ret = tr_peer_socket{}; if (connect(s, reinterpret_cast(&sock), addrlen) == -1 && #ifdef _WIN32 sockerrno != WSAEWOULDBLOCK && @@ -291,15 +290,12 @@ tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_socket_address const } tr_net_close_socket(s); - } - else - { - ret = tr_peer_socket{ session, socket_address, s }; + return {}; } tr_logAddTrace(fmt::format("New OUTGOING connection {} ({})", s, socket_address.display_name())); - return ret; + return { session, socket_address, s }; } namespace diff --git a/libtransmission/net.h b/libtransmission/net.h index fc23685f4..6e358745a 100644 --- a/libtransmission/net.h +++ b/libtransmission/net.h @@ -242,7 +242,7 @@ struct tr_address } addr; static auto constexpr CompactAddrBytes = std::array{ 4U, 16U }; - static auto constexpr CompactAddrMaxBytes = 16U; + static auto constexpr CompactAddrMaxBytes = *std::max_element(std::begin(CompactAddrBytes), std::end(CompactAddrBytes)); static_assert(std::size(CompactAddrBytes) == NUM_TR_AF_INET_TYPES); [[nodiscard]] static auto any(tr_address_type type) noexcept @@ -397,6 +397,7 @@ struct tr_socket_address static auto constexpr CompactSockAddrBytes = std::array{ tr_address::CompactAddrBytes[0] + tr_port::CompactPortBytes, tr_address::CompactAddrBytes[1] + tr_port::CompactPortBytes }; + static auto constexpr CompactSockAddrMaxBytes = tr_address::CompactAddrMaxBytes + tr_port::CompactPortBytes; static_assert(std::size(CompactSockAddrBytes) == NUM_TR_AF_INET_TYPES); }; diff --git a/libtransmission/peer-common.h b/libtransmission/peer-common.h index 55ba2a754..d53e2d835 100644 --- a/libtransmission/peer-common.h +++ b/libtransmission/peer-common.h @@ -26,9 +26,9 @@ * @{ */ -class tr_peer; class tr_swarm; struct tr_bandwidth; +struct tr_peer; // --- Peer Publish / Subscribe @@ -178,27 +178,26 @@ using tr_peer_callback_generic = void (*)(tr_peer* peer, tr_peer_event const& ev * @see tr_peer_info * @see tr_peerMsgs */ -class tr_peer +struct tr_peer { -public: using Speed = libtransmission::Values::Speed; - explicit tr_peer(tr_torrent const* tor); + explicit tr_peer(tr_torrent const& tor); virtual ~tr_peer(); [[nodiscard]] virtual Speed get_piece_speed(uint64_t now, tr_direction direction) const = 0; - [[nodiscard]] bool hasPiece(tr_piece_index_t piece) const noexcept + [[nodiscard]] bool has_piece(tr_piece_index_t piece) const noexcept { return has().test(piece); } - [[nodiscard]] float percentDone() const noexcept + [[nodiscard]] float percent_done() const noexcept { return has().percent(); } - [[nodiscard]] bool isSeed() const noexcept + [[nodiscard]] bool is_seed() const noexcept { return has().has_all(); } @@ -208,22 +207,13 @@ public: [[nodiscard]] virtual tr_bitfield const& has() const noexcept = 0; // requests that have been made but haven't been fulfilled yet - [[nodiscard]] virtual size_t activeReqCount(tr_direction) const noexcept = 0; + [[nodiscard]] virtual size_t active_req_count(tr_direction) const noexcept = 0; - virtual void requestBlocks(tr_block_span_t const* block_spans, size_t n_spans) = 0; + virtual void request_blocks(tr_block_span_t const* block_spans, size_t n_spans) = 0; - struct RequestLimit + virtual void cancel_block_request(tr_block_index_t /*block*/) { - // How many blocks we could request. - size_t max_spans = 0; - - // How many spans those blocks could be in. - // This is for webseeds, which make parallel requests. - size_t max_blocks = 0; - }; - - // how many blocks could we request from this peer right now? - [[nodiscard]] virtual RequestLimit canRequest() const noexcept = 0; + } tr_session* const session; diff --git a/libtransmission/peer-mgr-active-requests.cc b/libtransmission/peer-mgr-active-requests.cc index 0a7d87ea4..c66a60bf3 100644 --- a/libtransmission/peer-mgr-active-requests.cc +++ b/libtransmission/peer-mgr-active-requests.cc @@ -21,7 +21,7 @@ #include "libtransmission/peer-mgr-wishlist.h" #include "libtransmission/tr-assert.h" -class tr_peer; +struct tr_peer; class ActiveRequests::Impl { diff --git a/libtransmission/peer-mgr-active-requests.h b/libtransmission/peer-mgr-active-requests.h index 325bd5796..1116e44f6 100644 --- a/libtransmission/peer-mgr-active-requests.h +++ b/libtransmission/peer-mgr-active-requests.h @@ -17,7 +17,7 @@ #include "libtransmission/transmission.h" // tr_block_index_t -class tr_peer; +struct tr_peer; /** * Bookkeeping for the active requests we have -- diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 0ff70b980..8650c3cab 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -646,7 +646,7 @@ private: webseeds.reserve(n); for (size_t i = 0; i < n; ++i) { - webseeds.emplace_back(tr_webseedNew(tor, tor->webseed(i), &tr_swarm::peer_callback_webseed, this)); + webseeds.emplace_back(tr_webseedNew(*tor, tor->webseed(i), &tr_swarm::peer_callback_webseed, this)); } webseeds.shrink_to_fit(); @@ -681,11 +681,9 @@ private: static void maybe_send_cancel_request(tr_peer* peer, tr_block_index_t block, tr_peer const* muted) { - auto* msgs = dynamic_cast(peer); - if (msgs != nullptr && msgs != muted) + if (peer != nullptr && peer != muted) { - peer->cancels_sent_to_peer.add(tr_time(), 1); - msgs->cancel_block_request(block); + peer->cancel_block_request(block); } } @@ -780,9 +778,9 @@ private: // didn't have the metadata before now... so refresh them all... for (auto* peer : peers) { - peer->onTorrentGotMetainfo(); + peer->on_torrent_got_metainfo(); - if (peer->isSeed()) + if (peer->is_seed()) { mark_peer_as_seed(*peer->peer_info); } @@ -1006,7 +1004,7 @@ size_t tr_swarm::WishlistMediator::count_piece_replication(tr_piece_index_t piec std::begin(swarm_.peers), std::end(swarm_.peers), size_t{}, - [piece](size_t acc, tr_peer* peer) { return acc + (peer->hasPiece(piece) ? 1U : 0U); }); + [piece](size_t acc, tr_peer* peer) { return acc + (peer->has_piece(piece) ? 1U : 0U); }); } tr_block_span_t tr_swarm::WishlistMediator::block_span(tr_piece_index_t piece) const @@ -1195,10 +1193,10 @@ private: // --- tr_peer virtual functions -tr_peer::tr_peer(tr_torrent const* tor) - : session{ tor->session } - , swarm{ tor->swarm } - , blame{ tor->block_count() } +tr_peer::tr_peer(tr_torrent const& tor) + : session{ tor.session } + , swarm{ tor.swarm } + , blame{ tor.block_count() } { } @@ -1263,7 +1261,7 @@ std::vector tr_peerMgrGetNextRequests(tr_torrent* torrent, tr_p swarm.update_endgame(); return swarm.wishlist->next( numwant, - [peer](tr_piece_index_t p) { return peer->hasPiece(p); }, + [peer](tr_piece_index_t p) { return peer->has_piece(p); }, [peer, &swarm](tr_block_index_t b) { return swarm.active_requests.has(b, peer); }); } @@ -1294,15 +1292,14 @@ namespace { namespace handshake_helpers { -void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr io, tr_peer_info* peer_info, tr_interned_string client) +void create_bit_torrent_peer(tr_torrent& tor, std::shared_ptr io, tr_peer_info* peer_info, tr_interned_string client) { TR_ASSERT(peer_info != nullptr); - TR_ASSERT(tr_isTorrent(tor)); - TR_ASSERT(tor->swarm != nullptr); + TR_ASSERT(tor.swarm != nullptr); - tr_swarm* swarm = tor->swarm; + tr_swarm* swarm = tor.swarm; - auto* peer = tr_peerMsgsNew(tor, peer_info, std::move(io), client, &tr_swarm::peer_callback_bt, swarm); + auto* peer = tr_peerMsgs::create(tor, peer_info, std::move(io), client, &tr_swarm::peer_callback_bt, swarm); swarm->peers.push_back(peer); @@ -1399,7 +1396,7 @@ void create_bit_torrent_peer(tr_torrent* tor, std::shared_ptr io, tr_ } result.io->set_bandwidth(&swarm->tor->bandwidth()); - create_bit_torrent_peer(swarm->tor, result.io, info, client); + create_bit_torrent_peer(*swarm->tor, result.io, info, client); return true; } @@ -1635,7 +1632,7 @@ int8_t tr_peerMgrPieceAvailability(tr_torrent const* tor, tr_piece_index_t piece } auto const& peers = tor->swarm->peers; - return std::count_if(std::begin(peers), std::end(peers), [piece](auto const* peer) { return peer->hasPiece(piece); }); + return std::count_if(std::begin(peers), std::end(peers), [piece](auto const* peer) { return peer->has_piece(piece); }); } void tr_peerMgrTorrentAvailability(tr_torrent const* tor, int8_t* tab, unsigned int n_tabs) @@ -1741,7 +1738,7 @@ namespace peer_stat_helpers stats.client = peer->user_agent().c_str(); stats.port = port.host(); stats.from = peer->peer_info->from_first(); - stats.progress = peer->percentDone(); + stats.progress = peer->percent_done(); stats.isUTP = peer->is_utp_connection(); stats.isEncrypted = peer->is_encrypted(); stats.rateToPeer_KBps = peer->get_piece_speed(now_msec, TR_CLIENT_TO_PEER).count(Speed::Units::KByps); @@ -1753,15 +1750,15 @@ namespace peer_stat_helpers stats.isIncoming = peer->is_incoming_connection(); stats.isDownloadingFrom = peer->is_active(TR_PEER_TO_CLIENT); stats.isUploadingTo = peer->is_active(TR_CLIENT_TO_PEER); - stats.isSeed = peer->isSeed(); + stats.isSeed = peer->is_seed(); stats.blocksToPeer = peer->blocks_sent_to_peer.count(now, CancelHistorySec); stats.blocksToClient = peer->blocks_sent_to_client.count(now, CancelHistorySec); stats.cancelsToPeer = peer->cancels_sent_to_peer.count(now, CancelHistorySec); stats.cancelsToClient = peer->cancels_sent_to_client.count(now, CancelHistorySec); - stats.activeReqsToPeer = peer->activeReqCount(TR_CLIENT_TO_PEER); - stats.activeReqsToClient = peer->activeReqCount(TR_PEER_TO_CLIENT); + stats.activeReqsToPeer = peer->active_req_count(TR_CLIENT_TO_PEER); + stats.activeReqsToClient = peer->active_req_count(TR_PEER_TO_CLIENT); char* pch = stats.flagStr; @@ -1876,14 +1873,14 @@ namespace update_interest_helpers TR_ASSERT(!tor->is_done()); TR_ASSERT(tor->client_can_download()); - if (peer->isSeed()) + if (peer->is_seed()) { return true; } for (tr_piece_index_t i = 0; i < tor->piece_count(); ++i) { - if (piece_is_interesting[i] && peer->hasPiece(i)) + if (piece_is_interesting[i] && peer->has_piece(i)) { return true; } @@ -1904,11 +1901,11 @@ void updateInterest(tr_swarm* swarm) if (auto const& peers = swarm->peers; !std::empty(peers)) { - int const n = tor->piece_count(); + auto const n = tor->piece_count(); // build a bitfield of interesting pieces... auto piece_is_interesting = std::vector(n); - for (int i = 0; i < n; ++i) + for (tr_piece_index_t i = 0U; i < n; ++i) { piece_is_interesting[i] = tor->piece_is_wanted(i) && !tor->has_piece(i); } @@ -2019,7 +2016,7 @@ void rechokeUploads(tr_swarm* s, uint64_t const now) auto salter = tr_salt_shaker{}; for (auto* const peer : peers) { - if (peer->isSeed()) + if (peer->is_seed()) { /* choke seeds and partial seeds */ peer->set_choke(true); @@ -2160,7 +2157,7 @@ auto constexpr MaxUploadIdleSecs = time_t{ 60 * 5 }; auto const* const info = peer->peer_info; /* disconnect if we're both seeds and enough time has passed for PEX */ - if (tor->is_done() && peer->isSeed()) + if (tor->is_done() && peer->is_seed()) { return !tor->allows_pex() || info->idle_secs(now).value_or(0U) >= 30U; } diff --git a/libtransmission/peer-mgr.h b/libtransmission/peer-mgr.h index c2b2262cf..398b78c7c 100644 --- a/libtransmission/peer-mgr.h +++ b/libtransmission/peer-mgr.h @@ -30,8 +30,8 @@ * @{ */ -class tr_peer; class tr_peer_socket; +struct tr_peer; struct tr_peerMgr; struct tr_peer_stat; struct tr_session; diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index c308abb5a..b521404c9 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -10,8 +10,8 @@ #include #include // uint8_t, uint32_t, int64_t #include +#include #include -#include #include // std::unique_ptr #include #include @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -58,19 +59,17 @@ struct tr_error; using namespace std::literals; +namespace +{ // initial capacity is big enough to hold a BtPeerMsgs::Piece message using MessageBuffer = libtransmission::StackBuffer>; using MessageReader = libtransmission::BufferReader; using MessageWriter = libtransmission::BufferWriter; -namespace -{ - // these values are hardcoded by various BEPs as noted namespace BtPeerMsgs { - -// http://bittorrent.org/beps/bep_0003.html#peer-messages +// https://www.bittorrent.org/beps/bep_0003.html#peer-messages auto constexpr Choke = uint8_t{ 0 }; auto constexpr Unchoke = uint8_t{ 1 }; auto constexpr Interested = uint8_t{ 2 }; @@ -81,7 +80,7 @@ auto constexpr Request = uint8_t{ 6 }; auto constexpr Piece = uint8_t{ 7 }; auto constexpr Cancel = uint8_t{ 8 }; -// http://bittorrent.org/beps/bep_0005.html +// https://www.bittorrent.org/beps/bep_0005.html#bittorrent-protocol-extension auto constexpr Port = uint8_t{ 9 }; // https://www.bittorrent.org/beps/bep_0006.html @@ -91,7 +90,7 @@ auto constexpr FextHaveNone = uint8_t{ 15 }; auto constexpr FextReject = uint8_t{ 16 }; auto constexpr FextAllowedFast = uint8_t{ 17 }; -// http://bittorrent.org/beps/bep_0010.html +// https://www.bittorrent.org/beps/bep_0010.html // see also LtepMessageIds below auto constexpr Ltep = uint8_t{ 20 }; @@ -141,12 +140,12 @@ auto constexpr Ltep = uint8_t{ 20 }; namespace LtepMessages { -// http://bittorrent.org/beps/bep_0010.html +// https://www.bittorrent.org/beps/bep_0010.html auto constexpr Handshake = uint8_t{ 0 }; } // namespace LtepMessages -// http://bittorrent.org/beps/bep_0010.html +// https://www.bittorrent.org/beps/bep_0010.html // Client-defined extension message IDs that we tell peers about // in the LTEP handshake and will respond to when sent in an LTEP // message. @@ -162,7 +161,7 @@ enum LtepMessageIds : uint8_t UT_METADATA_ID = 3, }; -// http://bittorrent.org/beps/bep_0009.html +// https://www.bittorrent.org/beps/bep_0009.html namespace MetadataMsgType { @@ -172,25 +171,23 @@ auto constexpr Reject = 2; } // namespace MetadataMsgType -auto constexpr MinChokePeriodSec = 10; +auto constexpr MinChokePeriodSec = time_t{ 10 }; // idle seconds before we send a keepalive -auto constexpr KeepaliveIntervalSecs = 100; +auto constexpr KeepaliveIntervalSecs = time_t{ 100 }; -auto constexpr MetadataReqQ = 64; +auto constexpr MetadataReqQ = size_t{ 64U }; auto constexpr ReqQ = 512; -// used in lowering the outMessages queue period - // when we're making requests from another peer, // batch them together to send enough requests to // meet our bandwidth goals for the next N seconds -auto constexpr RequestBufSecs = 10; +auto constexpr RequestBufSecs = time_t{ 10 }; // --- -auto constexpr MaxPexPeerCount = size_t{ 50 }; +auto constexpr MaxPexPeerCount = size_t{ 50U }; // --- @@ -213,13 +210,13 @@ struct peer_request { return this->index == that.index && this->offset == that.offset && this->length == that.length; } -}; -peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block) -{ - auto const loc = tor->block_loc(block); - return peer_request{ loc.piece, loc.piece_offset, tor->block_size(block) }; -} + [[nodiscard]] static auto from_block(tr_torrent const& tor, tr_block_index_t block) noexcept + { + auto const loc = tor.block_loc(block); + return peer_request{ loc.piece, loc.piece_offset, tor.block_size(block) }; + } +}; // --- @@ -263,62 +260,12 @@ struct tr_incoming private: std::bitset have_; - size_t const block_size_; + uint32_t const block_size_; }; - std::map blocks; + std::unordered_map blocks; }; -[[nodiscard]] bool is_valid_request(tr_torrent const& tor, tr_piece_index_t index, uint32_t offset, uint32_t length) -{ - int err = 0; - - if (index >= tor.piece_count()) - { - err = 1; - } - else if (length < 1) - { - err = 2; - } - else if (offset + length > tor.piece_size(index)) - { - err = 3; - } - else if (length > tr_block_info::BlockSize) - { - err = 4; - } - else if (tor.piece_loc(index, offset, length).byte > tor.total_size()) - { - err = 5; - } - - if (err != 0) - { - tr_logAddTraceTor(&tor, fmt::format("index {} offset {} length {} err {}", index, offset, length, err)); - } - - return err == 0; -} - -class tr_peerMsgsImpl; -// TODO: make these to be member functions -ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece); -void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs); -void didWrite(tr_peerIo* io, size_t bytes_written, bool was_piece_data, void* vmsgs); -void gotError(tr_peerIo* io, tr_error const& err, void* vmsgs); -void peerPulse(void* vmsgs); -size_t protocolSendCancel(tr_peerMsgsImpl* msgs, struct peer_request const& req); -size_t protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke); -size_t protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index); -size_t protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port); -size_t protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req); -void sendInterest(tr_peerMsgsImpl* msgs, bool b); -void sendLtepHandshake(tr_peerMsgsImpl* msgs); -void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs); -void updateDesiredRequestCount(tr_peerMsgsImpl* msgs); - #define myLogMacro(msgs, level, text) \ do \ { \ @@ -328,8 +275,8 @@ void updateDesiredRequestCount(tr_peerMsgsImpl* msgs); __FILE__, \ __LINE__, \ (level), \ - fmt::format("{:s} [{:s}]: {:s}", (msgs)->io->display_name(), (msgs)->user_agent().sv(), text), \ - (msgs)->torrent->name()); \ + fmt::format("{:s} [{:s}]: {:s}", (msgs)->display_name(), (msgs)->user_agent().sv(), text), \ + (msgs)->tor_.name()); \ } \ } while (0) @@ -357,39 +304,39 @@ class tr_peerMsgsImpl final : public tr_peerMsgs { public: tr_peerMsgsImpl( - tr_torrent* torrent_in, + tr_torrent& torrent_in, tr_peer_info* const peer_info_in, std::shared_ptr io_in, tr_interned_string client, tr_peer_callback_bt callback, void* callback_data) : tr_peerMsgs{ torrent_in, peer_info_in, client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() } - , torrent{ torrent_in } - , io{ std::move(io_in) } - , have_{ torrent_in->piece_count() } + , tor_{ torrent_in } + , io_{ std::move(io_in) } + , have_{ torrent_in.piece_count() } , callback_{ callback } , callback_data_{ callback_data } { - if (torrent->allows_pex()) + if (tor_.allows_pex()) { - pex_timer_ = session->timerMaker().create([this]() { sendPex(); }); + pex_timer_ = session->timerMaker().create([this]() { send_ut_pex(); }); pex_timer_->start_repeating(SendPexInterval); } - if (io->supports_ltep()) + if (io_->supports_ltep()) { - sendLtepHandshake(this); + send_ltep_handshake(); } - tellPeerWhatWeHave(this); + protocol_send_bitfield(); - if (session->allowsDHT() && io->supports_dht()) + if (session->allowsDHT() && io_->supports_dht()) { - protocolSendPort(this, session->udpPort()); + protocol_send_port(session->udpPort()); } - io->set_callbacks(canRead, didWrite, gotError, this); - updateDesiredRequestCount(this); + io_->set_callbacks(can_read, did_write, got_error, this); + update_desired_request_count(); update_active(); } @@ -404,23 +351,25 @@ public: set_active(TR_UP, false); set_active(TR_DOWN, false); - if (io) + if (io_) { - io->clear(); + io_->clear(); } } + // --- + [[nodiscard]] Speed get_piece_speed(uint64_t now, tr_direction dir) const override { - return io->get_piece_speed(now, dir); + return io_->get_piece_speed(now, dir); } - [[nodiscard]] size_t activeReqCount(tr_direction dir) const noexcept override + [[nodiscard]] size_t active_req_count(tr_direction dir) const noexcept override { switch (dir) { case TR_CLIENT_TO_PEER: // requests we sent - return tr_peerMgrCountActiveRequestsToPeer(torrent, this); + return tr_peerMgrCountActiveRequestsToPeer(&tor_, this); case TR_PEER_TO_CLIENT: // requests they sent return std::size(peer_requested_); @@ -433,7 +382,7 @@ public: [[nodiscard]] tr_socket_address socket_address() const override { - return io->socket_address(); + return io_->socket_address(); } [[nodiscard]] std::string display_name() const override @@ -446,58 +395,54 @@ public: return have_; } - void onTorrentGotMetainfo() noexcept override - { - invalidatePercentDone(); + // --- + void on_torrent_got_metainfo() noexcept override + { update_active(); } - void invalidatePercentDone() - { - updateInterest(); - } - void cancel_block_request(tr_block_index_t block) override { - protocolSendCancel(this, blockToReq(torrent, block)); + cancels_sent_to_peer.add(tr_time(), 1); + protocol_send_cancel(peer_request::from_block(tor_, block)); } void set_choke(bool peer_is_choked) override { - time_t const now = tr_time(); - time_t const fibrillation_time = now - MinChokePeriodSec; + auto const now = tr_time(); + auto const fibrillation_time = now - MinChokePeriodSec; - if (chokeChangedAt > fibrillation_time) + if (choke_changed_at_ > fibrillation_time) { - // TODO logtrace(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked); + logtrace(this, fmt::format("Not changing choke to {} to avoid fibrillation", peer_is_choked)); } else if (this->peer_is_choked() != peer_is_choked) { set_peer_choked(peer_is_choked); + // https://www.bittorrent.org/beps/bep_0006.html#reject-request + // A peer SHOULD choke first and then reject requests so that + // the peer receiving the choke does not re-request the pieces. + protocol_send_choke(peer_is_choked); if (peer_is_choked) { - cancelAllRequestsToClient(this); + reject_all_requests(); } - protocolSendChoke(this, peer_is_choked); - chokeChangedAt = now; + choke_changed_at_ = now; update_active(TR_CLIENT_TO_PEER); } } - void pulse() override - { - peerPulse(this); - } + void pulse() override; void on_piece_completed(tr_piece_index_t piece) override { - protocolSendHave(this, piece); + protocol_send_have(piece); // since we have more pieces now, we might not be interested in this peer - updateInterest(); + update_interest(); } void set_interested(bool interested) override @@ -505,26 +450,16 @@ public: if (client_is_interested() != interested) { set_client_interested(interested); - sendInterest(this, interested); + protocol_send_interest(interested); update_active(TR_PEER_TO_CLIENT); } } - void updateInterest() - { - // TODO -- might need to poke the mgr on startup - } + // --- - // - - [[nodiscard]] bool isValidRequest(peer_request const& req) const + void request_blocks(tr_block_span_t const* block_spans, size_t n_spans) override { - return is_valid_request(*torrent, req.index, req.offset, req.length); - } - - void requestBlocks(tr_block_span_t const* block_spans, size_t n_spans) override - { - TR_ASSERT(torrent->client_can_download()); + TR_ASSERT(tor_.client_can_download()); TR_ASSERT(client_is_interested()); TR_ASSERT(!client_is_choked()); @@ -535,99 +470,24 @@ public: // Note that requests can't cross over a piece boundary. // So if a piece isn't evenly divisible by the block size, // we need to split our block request info per-piece chunks. - auto const byte_begin = torrent->block_loc(block).byte; - auto const block_size = torrent->block_size(block); + auto const byte_begin = tor_.block_loc(block).byte; + auto const block_size = tor_.block_size(block); auto const byte_end = byte_begin + block_size; for (auto offset = byte_begin; offset < byte_end;) { - auto const loc = torrent->byte_loc(offset); + auto const loc = tor_.byte_loc(offset); auto const left_in_block = block_size - loc.block_offset; - auto const left_in_piece = torrent->piece_size(loc.piece) - loc.piece_offset; + auto const left_in_piece = tor_.piece_size(loc.piece) - loc.piece_offset; auto const req_len = std::min(left_in_block, left_in_piece); - protocolSendRequest(this, { loc.piece, loc.piece_offset, req_len }); + protocol_send_request({ loc.piece, loc.piece_offset, req_len }); offset += req_len; } } - tr_peerMgrClientSentRequests(torrent, this, *span); + tr_peerMgrClientSentRequests(&tor_, this, *span); } } - // how many blocks could we request from this peer right now? - [[nodiscard]] RequestLimit canRequest() const noexcept override - { - auto const max_blocks = maxAvailableReqs(); - return RequestLimit{ max_blocks, max_blocks }; - } - - void sendPex(); - - void publish(tr_peer_event const& peer_event) - { - if (callback_ != nullptr) - { - (*callback_)(this, peer_event, callback_data_); - } - } - -private: - [[nodiscard]] size_t maxAvailableReqs() const - { - if (torrent->is_done() || !torrent->has_metainfo() || client_is_choked() || !client_is_interested()) - { - return 0; - } - - // Get the rate limit we should use. - // TODO: this needs to consider all the other peers as well... - uint64_t const now = tr_time_msec(); - auto rate = get_piece_speed(now, TR_PEER_TO_CLIENT); - if (torrent->uses_speed_limit(TR_PEER_TO_CLIENT)) - { - rate = std::min(rate, torrent->speed_limit(TR_PEER_TO_CLIENT)); - } - - // honor the session limits, if enabled - if (torrent->uses_session_limits()) - { - if (auto const limit = torrent->session->active_speed_limit(TR_PEER_TO_CLIENT); limit) - { - rate = std::min(rate, *limit); - } - } - - // use this desired rate to figure out how - // many requests we should send to this peer - size_t constexpr Floor = 32; - size_t constexpr Seconds = RequestBufSecs; - size_t const estimated_blocks_in_period = (rate.base_quantity() * Seconds) / tr_block_info::BlockSize; - size_t const ceil = reqq ? *reqq : 250; - - auto max_reqs = estimated_blocks_in_period; - max_reqs = std::min(max_reqs, ceil); - max_reqs = std::max(max_reqs, Floor); - return max_reqs; - } - - [[nodiscard]] bool calculate_active(tr_direction direction) const - { - if (direction == TR_CLIENT_TO_PEER) - { - return peer_is_interested() && !peer_is_choked(); - } - - // TR_PEER_TO_CLIENT - - if (!torrent->has_metainfo()) - { - return true; - } - - auto const active = client_is_interested() && !client_is_choked(); - TR_ASSERT(!active || !torrent->is_done()); - return active; - } - void update_active() { update_active(TR_UP); @@ -637,66 +497,227 @@ private: void update_active(tr_direction direction) { TR_ASSERT(tr_isDirection(direction)); - set_active(direction, calculate_active(direction)); } -public: - bool peerSupportsPex = false; - bool peerSupportsMetadataXfer = false; - bool clientSentLtepHandshake = false; + [[nodiscard]] bool calculate_active(tr_direction direction) const + { + if (direction == TR_CLIENT_TO_PEER) + { + return peer_is_interested() && !peer_is_choked(); + } - size_t desired_request_count = 0; + // TR_PEER_TO_CLIENT - uint8_t ut_pex_id = 0; - uint8_t ut_metadata_id = 0; + if (!tor_.has_metainfo()) + { + return true; + } - tr_port dht_port; + auto const active = client_is_interested() && !client_is_choked(); + TR_ASSERT(!active || !tor_.is_done()); + return active; + } - EncryptionPreference encryption_preference = EncryptionPreference::Unknown; +private: + // --- - tr_torrent* const torrent; + void update_interest() + { + // TODO(ckerr) -- might need to poke the mgr on startup - std::shared_ptr const io; + // additional note (tearfur) + // by "poke the mgr", Charles probably meant calling isPeerInteresting(), + // then pass the result to set_interesting() + } - std::vector peer_requested_; + // --- - std::array, NUM_TR_AF_INET_TYPES> pex; + [[nodiscard]] bool is_valid_request(peer_request const& req) const; - std::queue peerAskedForMetadata; + void reject_all_requests() + { + auto& queue = peer_requested_; - time_t clientSentAnythingAt = 0; + if (auto const must_send_rej = io_->supports_fext(); must_send_rej) + { + std::for_each(std::begin(queue), std::end(queue), [this](peer_request const& req) { protocol_send_reject(req); }); + } - time_t chokeChangedAt = 0; + queue.clear(); + } - /* when we started batching the outMessages */ - // time_t outMessagesBatchedAt = 0; + [[nodiscard]] bool can_add_request_from_peer(peer_request const& req); - struct tr_incoming incoming = {}; + void on_peer_made_request(peer_request const& req) + { + if (can_add_request_from_peer(req)) + { + peer_requested_.emplace_back(req); + } + else if (io_->supports_fext()) + { + protocol_send_reject(req); + } + } - /* if the peer supports the Extension Protocol in BEP 10 and - supplied a reqq argument, it's stored here. */ - std::optional reqq; + // how many blocks could we request from this peer right now? + [[nodiscard]] size_t max_available_reqs() const; + + void update_desired_request_count() + { + desired_request_count_ = max_available_reqs(); + } + + void update_block_requests(); + + // --- + + [[nodiscard]] std::optional pop_next_metadata_request() + { + auto& reqs = peer_requested_metadata_pieces_; + + if (std::empty(reqs)) + { + return {}; + } + + auto next = reqs.front(); + reqs.pop(); + return next; + } + + void update_metadata_requests(time_t now) const; + [[nodiscard]] size_t add_next_metadata_piece(); + [[nodiscard]] size_t add_next_block(time_t now_sec, uint64_t now_msec); + [[nodiscard]] size_t fill_output_buffer(time_t now_sec, uint64_t now_msec); + + // --- + + void send_ltep_handshake(); + void parse_ltep_handshake(MessageReader& payload); + void parse_ut_metadata(MessageReader& payload_in); + void parse_ut_pex(MessageReader& payload); + void parse_ltep(MessageReader& payload); + + void send_ut_pex(); + + int client_got_block(std::unique_ptr block_data, tr_block_index_t block); + ReadResult read_piece_data(MessageReader& payload); + ReadResult process_peer_message(uint8_t id, MessageReader& payload); + + // --- + + size_t protocol_send_keepalive() const; + + template + size_t protocol_send_message(uint8_t type, Args const&... args) const; + + size_t protocol_send_reject(peer_request const& req) const + { + TR_ASSERT(io_->supports_fext()); + return protocol_send_message(BtPeerMsgs::FextReject, req.index, req.offset, req.length); + } + + size_t protocol_send_cancel(peer_request const& req) const + { + return protocol_send_message(BtPeerMsgs::Cancel, req.index, req.offset, req.length); + } + + size_t protocol_send_request(peer_request const& req) const + { + TR_ASSERT(is_valid_request(req)); + return protocol_send_message(BtPeerMsgs::Request, req.index, req.offset, req.length); + } + + size_t protocol_send_port(tr_port const port) const + { + return protocol_send_message(BtPeerMsgs::Port, port.host()); + } + + size_t protocol_send_have(tr_piece_index_t const index) const + { + static_assert(sizeof(tr_piece_index_t) == sizeof(uint32_t)); + return protocol_send_message(BtPeerMsgs::Have, index); + } + + size_t protocol_send_choke(bool const choke) const + { + return protocol_send_message(choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke); + } + + void protocol_send_interest(bool const b) const + { + protocol_send_message(b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested); + } + + void protocol_send_bitfield(); + + // --- + + void publish(tr_peer_event const& peer_event) + { + if (callback_ != nullptr) + { + (*callback_)(this, peer_event, callback_data_); + } + } + + // --- + + static void did_write(tr_peerIo* /*io*/, size_t bytes_written, bool was_piece_data, void* vmsgs); + static ReadState can_read(tr_peerIo* io, void* vmsgs, size_t* piece); + static void got_error(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs); + + // --- + + bool peer_supports_pex_ = false; + bool peer_supports_metadata_xfer_ = false; + bool client_sent_ltep_handshake_ = false; + + size_t desired_request_count_ = 0; + + uint8_t ut_pex_id_ = 0; + uint8_t ut_metadata_id_ = 0; + + tr_port dht_port_; + + EncryptionPreference encryption_preference_ = EncryptionPreference::Unknown; + + tr_torrent& tor_; + + std::shared_ptr const io_; + + std::deque peer_requested_; + + std::array, NUM_TR_AF_INET_TYPES> pex_; + + std::queue peer_requested_metadata_pieces_; + + time_t client_sent_at_ = 0; + + time_t choke_changed_at_ = 0; + + tr_incoming incoming_ = {}; + + // if the peer supports the Extension Protocol in BEP 10 and + // supplied a reqq argument, it's stored here. + std::optional reqq_; std::unique_ptr pex_timer_; tr_bitfield have_; -private: - friend ReadResult process_peer_message(tr_peerMsgsImpl* msgs, uint8_t id, MessageReader& payload); - friend void parseLtepHandshake(tr_peerMsgsImpl* msgs, MessageReader& payload); - friend void parseUtMetadata(tr_peerMsgsImpl* msgs, MessageReader& payload_in); - tr_peer_callback_bt const callback_; void* const callback_data_; - // seconds between periodic sendPex() calls + // seconds between periodic send_ut_pex() calls static auto constexpr SendPexInterval = 90s; }; // --- -[[nodiscard]] constexpr bool messageLengthIsCorrect(tr_torrent const* const tor, uint8_t id, uint32_t len) +[[nodiscard]] constexpr bool is_message_length_correct(tr_torrent const& tor, uint8_t id, uint32_t len) { switch (id) { @@ -714,7 +735,7 @@ private: return len == 5U; case BtPeerMsgs::Bitfield: - return !tor->has_metainfo() || len == 1 + ((tor->piece_count() + 7U) / 8U); + return !tor.has_metainfo() || len == 1 + ((tor.piece_count() + 7U) / 8U); case BtPeerMsgs::Request: case BtPeerMsgs::Cancel: @@ -738,9 +759,6 @@ private: namespace protocol_send_message_helpers { -namespace -{ - [[nodiscard]] constexpr auto get_param_length(uint8_t param) noexcept { return sizeof(param); @@ -815,388 +833,107 @@ template (text.append(log_param(args)), ...); return text; } -} // namespace template -void build_peer_message(tr_peerMsgsImpl const* const msgs, MessageWriter& out, uint8_t type, Args const&... args) +size_t build_peer_message(MessageWriter& out, uint8_t type, Args const&... args) { - logtrace(msgs, build_log_message(type, args...)); - auto msg_len = sizeof(type); ((msg_len += get_param_length(args)), ...); out.add_uint32(msg_len); out.add_uint8(type); (add_param(out, args), ...); - TR_ASSERT(messageLengthIsCorrect(msgs->torrent, type, msg_len)); + return msg_len; } } // namespace protocol_send_message_helpers template -size_t protocol_send_message(tr_peerMsgsImpl const* const msgs, uint8_t type, Args const&... args) +size_t tr_peerMsgsImpl::protocol_send_message(uint8_t type, Args const&... args) const { using namespace protocol_send_message_helpers; + logtrace(this, build_log_message(type, args...)); + auto out = MessageBuffer{}; - build_peer_message(msgs, out, type, args...); + [[maybe_unused]] auto const msg_len = build_peer_message(out, type, args...); + TR_ASSERT(is_message_length_correct(tor_, type, msg_len)); auto const n_bytes_added = std::size(out); - msgs->io->write(out, type == BtPeerMsgs::Piece); + io_->write(out, type == BtPeerMsgs::Piece); return n_bytes_added; } -size_t protocol_send_keepalive(tr_peerMsgsImpl* msgs) +void tr_peerMsgsImpl::protocol_send_bitfield() { - logtrace(msgs, "sending 'keepalive'"); + bool const fext = io_->supports_fext(); + + if (fext && tor_.has_all()) + { + protocol_send_message(BtPeerMsgs::FextHaveAll); + } + else if (fext && tor_.has_none()) + { + protocol_send_message(BtPeerMsgs::FextHaveNone); + } + else if (!tor_.has_none()) + { + // https://www.bittorrent.org/beps/bep_0003.html#peer-messages + // Downloaders which don't have anything yet may skip the 'bitfield' message. + protocol_send_message(BtPeerMsgs::Bitfield, tor_.create_piece_bitfield()); + } +} + +size_t tr_peerMsgsImpl::protocol_send_keepalive() const +{ + logtrace(this, "sending 'keepalive'"); auto out = MessageBuffer{}; out.add_uint32(0); auto const n_bytes_added = std::size(out); - msgs->io->write(out, false); + io_->write(out, false); return n_bytes_added; } -auto protocolSendReject(tr_peerMsgsImpl* const msgs, struct peer_request const* req) -{ - TR_ASSERT(msgs->io->supports_fext()); - return protocol_send_message(msgs, BtPeerMsgs::FextReject, req->index, req->offset, req->length); -} - -size_t protocolSendCancel(tr_peerMsgsImpl* const msgs, peer_request const& req) -{ - return protocol_send_message(msgs, BtPeerMsgs::Cancel, req.index, req.offset, req.length); -} - -size_t protocolSendRequest(tr_peerMsgsImpl* const msgs, struct peer_request const& req) -{ - TR_ASSERT(msgs->isValidRequest(req)); - return protocol_send_message(msgs, BtPeerMsgs::Request, req.index, req.offset, req.length); -} - -size_t protocolSendPort(tr_peerMsgsImpl* const msgs, tr_port port) -{ - return protocol_send_message(msgs, BtPeerMsgs::Port, port.host()); -} - -size_t protocolSendHave(tr_peerMsgsImpl* const msgs, tr_piece_index_t index) -{ - return protocol_send_message(msgs, BtPeerMsgs::Have, index); -} - -size_t protocolSendChoke(tr_peerMsgsImpl* const msgs, bool choke) -{ - return protocol_send_message(msgs, choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke); -} - -// --- INTEREST - -void sendInterest(tr_peerMsgsImpl* msgs, bool b) -{ - protocol_send_message(msgs, b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested); -} - -std::optional popNextMetadataRequest(tr_peerMsgsImpl* msgs) -{ - auto& reqs = msgs->peerAskedForMetadata; - - if (std::empty(reqs)) - { - return {}; - } - - auto next = reqs.front(); - reqs.pop(); - return next; -} - -void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs) -{ - auto& queue = msgs->peer_requested_; - - if (auto const must_send_rej = msgs->io->supports_fext(); must_send_rej) - { - for (auto const& req : queue) - { - protocolSendReject(msgs, &req); - } - } - - queue.clear(); -} - // --- -void sendLtepHandshake(tr_peerMsgsImpl* msgs) +void tr_peerMsgsImpl::parse_ltep(MessageReader& payload) { - if (msgs->clientSentLtepHandshake) + TR_ASSERT(!std::empty(payload)); + + auto const ltep_msgid = payload.to_uint8(); + + if (ltep_msgid == LtepMessages::Handshake) { - return; - } + logtrace(this, "got ltep handshake"); + parse_ltep_handshake(payload); - logtrace(msgs, "sending an ltep handshake"); - msgs->clientSentLtepHandshake = true; - - /* decide if we want to advertise metadata xfer support (BEP 9) */ - bool const allow_metadata_xfer = msgs->torrent->is_public(); - - /* decide if we want to advertise pex support */ - bool const allow_pex = msgs->torrent->allows_pex(); - - auto val = tr_variant{}; - tr_variantInitDict(&val, 8); - tr_variantDictAddBool(&val, TR_KEY_e, msgs->session->encryptionMode() != TR_CLEAR_PREFERRED); - - // If connecting to global peer, then use global address - // Otherwise we are connecting to local peer, use bind address directly - if (auto const addr = msgs->io->address().is_global_unicast_address() ? msgs->session->global_address(TR_AF_INET) : - msgs->session->bind_address(TR_AF_INET); - addr && !addr->is_any()) - { - TR_ASSERT(addr->is_ipv4()); - tr_variantDictAddRaw(&val, TR_KEY_ipv4, &addr->addr.addr4, sizeof(addr->addr.addr4)); - } - if (auto const addr = msgs->io->address().is_global_unicast_address() ? msgs->session->global_address(TR_AF_INET6) : - msgs->session->bind_address(TR_AF_INET6); - addr && !addr->is_any()) - { - TR_ASSERT(addr->is_ipv6()); - tr_variantDictAddRaw(&val, TR_KEY_ipv6, &addr->addr.addr6, sizeof(addr->addr.addr6)); - } - - // http://bittorrent.org/beps/bep_0009.html - // It also adds "metadata_size" to the handshake message (not the - // "m" dictionary) specifying an integer value of the number of - // bytes of the metadata. - if (auto const info_dict_size = msgs->torrent->info_dict_size(); - allow_metadata_xfer && msgs->torrent->has_metainfo() && info_dict_size > 0) - { - tr_variantDictAddInt(&val, TR_KEY_metadata_size, info_dict_size); - } - - // http://bittorrent.org/beps/bep_0010.html - // Local TCP listen port. Allows each side to learn about the TCP - // port number of the other side. Note that there is no need for the - // receiving side of the connection to send this extension message, - // since its port number is already known. - tr_variantDictAddInt(&val, TR_KEY_p, msgs->session->advertisedPeerPort().host()); - - // http://bittorrent.org/beps/bep_0010.html - // An integer, the number of outstanding request messages this - // client supports without dropping any. The default in in - // libtorrent is 250. - tr_variantDictAddInt(&val, TR_KEY_reqq, ReqQ); - - // https://www.bittorrent.org/beps/bep_0010.html - // A string containing the compact representation of the ip address this peer sees - // you as. i.e. this is the receiver's external ip address (no port is included). - // This may be either an IPv4 (4 bytes) or an IPv6 (16 bytes) address. - { - auto buf = std::array{}; - auto const begin = std::data(buf); - auto const end = msgs->io->address().to_compact(begin); - auto const len = end - begin; - TR_ASSERT(len == tr_address::CompactAddrBytes[0] || len == tr_address::CompactAddrBytes[1]); - tr_variantDictAddRaw(&val, TR_KEY_yourip, begin, len); - } - - // http://bittorrent.org/beps/bep_0010.html - // Client name and version (as a utf-8 string). This is a much more - // reliable way of identifying the client than relying on the - // peer id encoding. - tr_variantDictAddStrView(&val, TR_KEY_v, TR_NAME " " USERAGENT_PREFIX); - - // http://bittorrent.org/beps/bep_0021.html - // A peer that is a partial seed SHOULD include an extra header in - // the extension handshake 'upload_only'. Setting the value of this - // key to 1 indicates that this peer is not interested in downloading - // anything. - tr_variantDictAddBool(&val, TR_KEY_upload_only, msgs->torrent->is_done()); - - if (allow_metadata_xfer || allow_pex) - { - tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2); - - if (allow_metadata_xfer) + if (io_->supports_ltep()) { - tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID); - } - - if (allow_pex) - { - tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID); + send_ltep_handshake(); + send_ut_pex(); } } - - protocol_send_message(msgs, BtPeerMsgs::Ltep, LtepMessages::Handshake, tr_variant_serde::benc().to_string(val)); -} - -void parseLtepHandshake(tr_peerMsgsImpl* msgs, MessageReader& payload) -{ - auto const handshake_sv = payload.to_string_view(); - - auto var = tr_variant_serde::benc().inplace().parse(handshake_sv); - if (!var || !var->holds_alternative()) + else if (ltep_msgid == UT_PEX_ID) { - logtrace(msgs, "GET extended-handshake, couldn't get dictionary"); - return; + logtrace(this, "got ut pex"); + peer_supports_pex_ = true; + parse_ut_pex(payload); } - - logtrace(msgs, fmt::format("here is the base64-encoded handshake: [{:s}]", tr_base64_encode(handshake_sv))); - - /* does the peer prefer encrypted connections? */ - auto pex = tr_pex{}; - auto& [addr, port] = pex.socket_address; - if (auto e = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_e, &e)) + else if (ltep_msgid == UT_METADATA_ID) { - msgs->encryption_preference = e != 0 ? EncryptionPreference::Yes : EncryptionPreference::No; - - if (msgs->encryption_preference == EncryptionPreference::Yes) - { - pex.flags |= ADDED_F_ENCRYPTION_FLAG; - } + logtrace(this, "got ut metadata"); + peer_supports_metadata_xfer_ = true; + parse_ut_metadata(payload); } - - /* check supported messages for utorrent pex */ - msgs->peerSupportsPex = false; - msgs->peerSupportsMetadataXfer = false; - - if (tr_variant* sub = nullptr; tr_variantDictFindDict(&*var, TR_KEY_m, &sub)) + else { - if (auto ut_pex = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_pex, &ut_pex)) - { - msgs->peerSupportsPex = ut_pex != 0; - msgs->ut_pex_id = static_cast(ut_pex); - logtrace(msgs, fmt::format("msgs->ut_pex is {:d}", static_cast(msgs->ut_pex_id))); - } - - if (auto ut_metadata = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &ut_metadata)) - { - msgs->peerSupportsMetadataXfer = ut_metadata != 0; - msgs->ut_metadata_id = static_cast(ut_metadata); - logtrace(msgs, fmt::format("msgs->ut_metadata_id is {:d}", static_cast(msgs->ut_metadata_id))); - } - - if (auto ut_holepunch = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &ut_holepunch)) - { - // Transmission doesn't support this extension yet. - // But its presence does indicate µTP supports, - // which we do care about... - msgs->peer_info->set_utp_supported(true); - } - } - - /* look for metainfo size (BEP 9) */ - if (auto metadata_size = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_metadata_size, &metadata_size)) - { - if (!tr_metadata_download::is_valid_metadata_size(metadata_size)) - { - msgs->peerSupportsMetadataXfer = false; - } - else - { - msgs->torrent->maybe_start_metadata_transfer(metadata_size); - } - } - - /* look for upload_only (BEP 21) */ - if (auto upload_only = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_upload_only, &upload_only)) - { - pex.flags |= ADDED_F_SEED_FLAG; - } - - // http://bittorrent.org/beps/bep_0010.html - // Client name and version (as a utf-8 string). This is a much more - // reliable way of identifying the client than relying on the - // peer id encoding. - if (auto sv = std::string_view{}; tr_variantDictFindStrView(&*var, TR_KEY_v, &sv)) - { - msgs->set_user_agent(tr_interned_string{ sv }); - } - - /* get peer's listening port */ - if (auto p = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_p, &p) && p > 0) - { - port.set_host(p); - msgs->publish(tr_peer_event::GotPort(port)); - logtrace(msgs, fmt::format("peer's port is now {:d}", p)); - } - - std::byte const* addr_compact = nullptr; - auto addr_len = size_t{}; - if (msgs->io->is_incoming() && tr_variantDictFindRaw(&*var, TR_KEY_ipv4, &addr_compact, &addr_len) && - addr_len == tr_address::CompactAddrBytes[TR_AF_INET]) - { - std::tie(addr, std::ignore) = tr_address::from_compact_ipv4(addr_compact); - tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, 1); - } - - if (msgs->io->is_incoming() && tr_variantDictFindRaw(&*var, TR_KEY_ipv6, &addr_compact, &addr_len) && - addr_len == tr_address::CompactAddrBytes[TR_AF_INET6]) - { - std::tie(addr, std::ignore) = tr_address::from_compact_ipv6(addr_compact); - tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, 1); - } - - /* get peer's maximum request queue size */ - if (auto reqq = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_reqq, &reqq)) - { - msgs->reqq = reqq; + logtrace(this, fmt::format("skipping unknown ltep message ({:d})", static_cast(ltep_msgid))); } } -void parseUtMetadata(tr_peerMsgsImpl* msgs, MessageReader& payload_in) +void tr_peerMsgsImpl::parse_ut_pex(MessageReader& payload) { - int64_t msg_type = -1; - int64_t piece = -1; - int64_t total_size = 0; - - auto const tmp = payload_in.to_string_view(); - auto const* const msg_end = std::data(tmp) + std::size(tmp); - - auto serde = tr_variant_serde::benc(); - if (auto var = serde.inplace().parse(tmp); var) - { - (void)tr_variantDictFindInt(&*var, TR_KEY_msg_type, &msg_type); - (void)tr_variantDictFindInt(&*var, TR_KEY_piece, &piece); - (void)tr_variantDictFindInt(&*var, TR_KEY_total_size, &total_size); - } - - logtrace(msgs, fmt::format("got ut_metadata msg: type {:d}, piece {:d}, total_size {:d}", msg_type, piece, total_size)); - - if (msg_type == MetadataMsgType::Reject) - { - /* NOOP */ - } - - if (auto const piece_len = msg_end - serde.end(); - msg_type == MetadataMsgType::Data && piece * MetadataPieceSize + piece_len <= total_size) - { - msgs->torrent->set_metadata_piece(piece, serde.end(), piece_len); - } - - if (msg_type == MetadataMsgType::Request) - { - if (piece >= 0 && msgs->torrent->has_metainfo() && msgs->torrent->is_public() && - std::size(msgs->peerAskedForMetadata) < MetadataReqQ) - { - msgs->peerAskedForMetadata.push(piece); - } - else - { - /* send a rejection message */ - auto v = tr_variant{}; - tr_variantInitDict(&v, 2); - tr_variantDictAddInt(&v, TR_KEY_msg_type, MetadataMsgType::Reject); - tr_variantDictAddInt(&v, TR_KEY_piece, piece); - protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, serde.to_string(v)); - } - } -} - -void parseUtPex(tr_peerMsgsImpl* msgs, MessageReader& payload) -{ - auto* const tor = msgs->torrent; - if (!tor->allows_pex()) + if (!tor_.allows_pex()) { return; } @@ -1217,7 +954,7 @@ void parseUtPex(tr_peerMsgsImpl* msgs, MessageReader& payload) auto pex = tr_pex::from_compact_ipv4(added, added_len, added_f, added_f_len); pex.resize(std::min(MaxPexPeerCount, std::size(pex))); - tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, std::data(pex), std::size(pex)); + tr_peerMgrAddPex(&tor_, TR_PEER_FROM_PEX, std::data(pex), std::size(pex)); } if (tr_variantDictFindRaw(&*var, TR_KEY_added6, &added, &added_len)) @@ -1232,758 +969,24 @@ void parseUtPex(tr_peerMsgsImpl* msgs, MessageReader& payload) auto pex = tr_pex::from_compact_ipv6(added, added_len, added_f, added_f_len); pex.resize(std::min(MaxPexPeerCount, std::size(pex))); - tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, std::data(pex), std::size(pex)); + tr_peerMgrAddPex(&tor_, TR_PEER_FROM_PEX, std::data(pex), std::size(pex)); } } } -void parseLtep(tr_peerMsgsImpl* msgs, MessageReader& payload) -{ - TR_ASSERT(!std::empty(payload)); - - auto const ltep_msgid = payload.to_uint8(); - - if (ltep_msgid == LtepMessages::Handshake) - { - logtrace(msgs, "got ltep handshake"); - parseLtepHandshake(msgs, payload); - - if (msgs->io->supports_ltep()) - { - sendLtepHandshake(msgs); - msgs->sendPex(); - } - } - else if (ltep_msgid == UT_PEX_ID) - { - logtrace(msgs, "got ut pex"); - msgs->peerSupportsPex = true; - parseUtPex(msgs, payload); - } - else if (ltep_msgid == UT_METADATA_ID) - { - logtrace(msgs, "got ut metadata"); - msgs->peerSupportsMetadataXfer = true; - parseUtMetadata(msgs, payload); - } - else - { - logtrace(msgs, fmt::format("skipping unknown ltep message ({:d})", static_cast(ltep_msgid))); - } -} - -ReadResult process_peer_message(tr_peerMsgsImpl* msgs, uint8_t id, MessageReader& payload); - -[[nodiscard]] bool canAddRequestFromPeer(tr_peerMsgsImpl const* const msgs, struct peer_request const& req) -{ - if (msgs->peer_is_choked()) - { - logtrace(msgs, "rejecting request from choked peer"); - return false; - } - - if (std::size(msgs->peer_requested_) >= ReqQ) - { - logtrace(msgs, "rejecting request ... reqq is full"); - return false; - } - - if (!is_valid_request(*msgs->torrent, req.index, req.offset, req.length)) - { - logtrace(msgs, "rejecting an invalid request."); - return false; - } - - if (!msgs->torrent->has_piece(req.index)) - { - logtrace(msgs, "rejecting request for a piece we don't have."); - return false; - } - - return true; -} - -void peerMadeRequest(tr_peerMsgsImpl* msgs, struct peer_request const* req) -{ - if (canAddRequestFromPeer(msgs, *req)) - { - msgs->peer_requested_.emplace_back(*req); - } - else if (msgs->io->supports_fext()) - { - protocolSendReject(msgs, req); - } -} - -int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr block_data, tr_block_index_t block); - -ReadResult read_piece_data(tr_peerMsgsImpl* msgs, MessageReader& payload) -{ - // - auto const piece = payload.to_uint32(); - auto const offset = payload.to_uint32(); - auto const len = std::size(payload); - - auto const loc = msgs->torrent->piece_loc(piece, offset); - auto const block = loc.block; - auto const block_size = msgs->torrent->block_size(block); - - logtrace(msgs, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", len, piece, offset, len)); - - if (loc.block_offset + len > block_size) - { - logwarn(msgs, fmt::format("got unaligned piece {:d}:{:d}->{:d}", piece, offset, len)); - return { READ_ERR, len }; - } - - if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block)) - { - logwarn(msgs, fmt::format("got unrequested piece {:d}:{:d}->{:d}", piece, offset, len)); - return { READ_ERR, len }; - } - - msgs->publish(tr_peer_event::GotPieceData(len)); - - if (loc.block_offset == 0U && len == block_size) // simple case: one message has entire block - { - auto buf = std::make_unique(block_size); - payload.to_buf(std::data(*buf), len); - auto const ok = clientGotBlock(msgs, std::move(buf), block) == 0; - return { ok ? READ_NOW : READ_ERR, len }; - } - - auto& blocks = msgs->incoming.blocks; - auto& incoming_block = blocks.try_emplace(block, block_size).first->second; - payload.to_buf(std::data(*incoming_block.buf) + loc.block_offset, len); - - if (!incoming_block.add_span(loc.block_offset, loc.block_offset + len)) - { - return { READ_ERR, len }; // invalid span - } - - if (!incoming_block.has_all()) - { - return { READ_LATER, len }; // we don't have the full block yet - } - - auto block_buf = std::move(incoming_block.buf); - blocks.erase(block); // note: invalidates `incoming_block` local - auto const ok = clientGotBlock(msgs, std::move(block_buf), block) == 0; - return { ok ? READ_NOW : READ_ERR, len }; -} - -ReadResult process_peer_message(tr_peerMsgsImpl* msgs, uint8_t id, MessageReader& payload) -{ - bool const fext = msgs->io->supports_fext(); - - auto ui32 = uint32_t{}; - - logtrace( - msgs, - fmt::format( - "got peer msg '{:s}' ({:d}) with payload len {:d}", - BtPeerMsgs::debug_name(id), - static_cast(id), - std::size(payload))); - - if (!messageLengthIsCorrect(msgs->torrent, id, sizeof(id) + std::size(payload))) - { - logdbg( - msgs, - fmt::format( - "bad msg: '{:s}' ({:d}) with payload len {:d}", - BtPeerMsgs::debug_name(id), - static_cast(id), - std::size(payload))); - msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return { READ_ERR, {} }; - } - - switch (id) - { - case BtPeerMsgs::Choke: - logtrace(msgs, "got Choke"); - msgs->set_client_choked(true); - - if (!fext) - { - msgs->publish(tr_peer_event::GotChoke()); - } - - msgs->update_active(TR_PEER_TO_CLIENT); - break; - - case BtPeerMsgs::Unchoke: - logtrace(msgs, "got Unchoke"); - msgs->set_client_choked(false); - msgs->update_active(TR_PEER_TO_CLIENT); - updateDesiredRequestCount(msgs); - break; - - case BtPeerMsgs::Interested: - logtrace(msgs, "got Interested"); - msgs->set_peer_interested(true); - msgs->update_active(TR_CLIENT_TO_PEER); - break; - - case BtPeerMsgs::NotInterested: - logtrace(msgs, "got Not Interested"); - msgs->set_peer_interested(false); - msgs->update_active(TR_CLIENT_TO_PEER); - break; - - case BtPeerMsgs::Have: - ui32 = payload.to_uint32(); - logtrace(msgs, fmt::format("got Have: {:d}", ui32)); - - if (msgs->torrent->has_metainfo() && ui32 >= msgs->torrent->piece_count()) - { - msgs->publish(tr_peer_event::GotError(ERANGE)); - return { READ_ERR, {} }; - } - - /* a peer can send the same HAVE message twice... */ - if (!msgs->have_.test(ui32)) - { - msgs->have_.set(ui32); - msgs->publish(tr_peer_event::GotHave(ui32)); - } - - msgs->invalidatePercentDone(); - break; - - case BtPeerMsgs::Bitfield: - logtrace(msgs, "got a bitfield"); - msgs->have_ = tr_bitfield{ msgs->torrent->has_metainfo() ? msgs->torrent->piece_count() : std::size(payload) * 8 }; - msgs->have_.set_raw(reinterpret_cast(std::data(payload)), std::size(payload)); - msgs->publish(tr_peer_event::GotBitfield(&msgs->have_)); - msgs->invalidatePercentDone(); - break; - - case BtPeerMsgs::Request: - { - struct peer_request r; - r.index = payload.to_uint32(); - r.offset = payload.to_uint32(); - r.length = payload.to_uint32(); - logtrace(msgs, fmt::format("got Request: {:d}:{:d}->{:d}", r.index, r.offset, r.length)); - peerMadeRequest(msgs, &r); - break; - } - - case BtPeerMsgs::Cancel: - { - struct peer_request r; - r.index = payload.to_uint32(); - r.offset = payload.to_uint32(); - r.length = payload.to_uint32(); - msgs->cancels_sent_to_client.add(tr_time(), 1); - logtrace(msgs, fmt::format("got a Cancel {:d}:{:d}->{:d}", r.index, r.offset, r.length)); - - auto& requests = msgs->peer_requested_; - if (auto iter = std::find(std::begin(requests), std::end(requests), r); iter != std::end(requests)) - { - requests.erase(iter); - - // bep6: "Even when a request is cancelled, the peer - // receiving the cancel should respond with either the - // corresponding reject or the corresponding piece" - if (fext) - { - protocolSendReject(msgs, &r); - } - } - break; - } - - case BtPeerMsgs::Piece: - return read_piece_data(msgs, payload); - break; - - case BtPeerMsgs::Port: - // http://bittorrent.org/beps/bep_0005.html - // Peers supporting the DHT set the last bit of the 8-byte reserved flags - // exchanged in the BitTorrent protocol handshake. Peer receiving a handshake - // indicating the remote peer supports the DHT should send a PORT message. - // It begins with byte 0x09 and has a two byte payload containing the UDP - // port of the DHT node in network byte order. - { - logtrace(msgs, "Got a BtPeerMsgs::Port"); - - auto const hport = payload.to_uint16(); - if (auto const dht_port = tr_port::from_host(hport); !std::empty(dht_port)) - { - msgs->dht_port = dht_port; - msgs->session->maybe_add_dht_node(msgs->io->address(), msgs->dht_port); - } - } - break; - - case BtPeerMsgs::FextSuggest: - logtrace(msgs, "Got a BtPeerMsgs::FextSuggest"); - - if (fext) - { - auto const piece = payload.to_uint32(); - msgs->publish(tr_peer_event::GotSuggest(piece)); - } - else - { - msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return { READ_ERR, {} }; - } - - break; - - case BtPeerMsgs::FextAllowedFast: - logtrace(msgs, "Got a BtPeerMsgs::FextAllowedFast"); - - if (fext) - { - auto const piece = payload.to_uint32(); - msgs->publish(tr_peer_event::GotAllowedFast(piece)); - } - else - { - msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return { READ_ERR, {} }; - } - - break; - - case BtPeerMsgs::FextHaveAll: - logtrace(msgs, "Got a BtPeerMsgs::FextHaveAll"); - - if (fext) - { - msgs->have_.set_has_all(); - msgs->publish(tr_peer_event::GotHaveAll()); - msgs->invalidatePercentDone(); - } - else - { - msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return { READ_ERR, {} }; - } - - break; - - case BtPeerMsgs::FextHaveNone: - logtrace(msgs, "Got a BtPeerMsgs::FextHaveNone"); - - if (fext) - { - msgs->have_.set_has_none(); - msgs->publish(tr_peer_event::GotHaveNone()); - msgs->invalidatePercentDone(); - } - else - { - msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return { READ_ERR, {} }; - } - - break; - - case BtPeerMsgs::FextReject: - { - struct peer_request r; - r.index = payload.to_uint32(); - r.offset = payload.to_uint32(); - r.length = payload.to_uint32(); - - if (fext) - { - msgs->publish( - tr_peer_event::GotRejected(msgs->torrent->block_info(), msgs->torrent->piece_loc(r.index, r.offset).block)); - } - else - { - msgs->publish(tr_peer_event::GotError(EMSGSIZE)); - return { READ_ERR, {} }; - } - - break; - } - - case BtPeerMsgs::Ltep: - logtrace(msgs, "Got a BtPeerMsgs::Ltep"); - parseLtep(msgs, payload); - break; - - default: - logtrace(msgs, fmt::format("peer sent us an UNKNOWN: {:d}", static_cast(id))); - break; - } - - return { READ_NOW, {} }; -} - -/* returns 0 on success, or an errno on failure */ -int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr block_data, tr_block_index_t const block) -{ - TR_ASSERT(msgs != nullptr); - - tr_torrent const* const tor = msgs->torrent; - auto const n_expected = msgs->torrent->block_size(block); - - if (!block_data) - { - logdbg(msgs, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, 0)); - return EMSGSIZE; - } - - if (std::size(*block_data) != msgs->torrent->block_size(block)) - { - logdbg(msgs, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, std::size(*block_data))); - return EMSGSIZE; - } - - logtrace(msgs, fmt::format("got block {:d}", block)); - - if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block)) - { - logdbg(msgs, "we didn't ask for this message..."); - return 0; - } - - auto const loc = msgs->torrent->block_loc(block); - if (msgs->torrent->has_piece(loc.piece)) - { - logtrace(msgs, "we did ask for this message, but the piece is already complete..."); - return 0; - } - - // NB: if writeBlock() fails the torrent may be paused. - // If this happens, `msgs` will be a dangling pointer and must no longer be used. - if (auto const err = msgs->session->cache->write_block(tor->id(), block, std::move(block_data)); err != 0) - { - return err; - } - - msgs->blame.set(loc.piece); - msgs->publish(tr_peer_event::GotBlock(tor->block_info(), block)); - - return 0; -} - -void didWrite(tr_peerIo* /*io*/, size_t bytes_written, bool was_piece_data, void* vmsgs) -{ - auto* const msgs = static_cast(vmsgs); - - if (was_piece_data) - { - msgs->publish(tr_peer_event::SentPieceData(bytes_written)); - } - - peerPulse(msgs); -} - -ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece) -{ - auto* msgs = static_cast(vmsgs); - - // https://www.bittorrent.org/beps/bep_0003.html - // Next comes an alternating stream of length prefixes and messages. - // Messages of length zero are keepalives, and ignored. - // All non-keepalive messages start with a single byte which gives their type. - // - // https://wiki.theory.org/BitTorrentSpecification - // All of the remaining messages in the protocol take the form of - // . The length prefix is a four byte - // big-endian value. The message ID is a single decimal byte. - // The payload is message dependent. - - // read - auto& current_message_len = msgs->incoming.length; // the full message payload length. Includes the +1 for id length - if (!current_message_len) - { - auto message_len = uint32_t{}; - if (io->read_buffer_size() < sizeof(message_len)) - { - return READ_LATER; - } - - io->read_uint32(&message_len); - - // The keep-alive message is a message with zero bytes, - // specified with the length prefix set to zero. - // There is no message ID and no payload. - if (message_len == 0U) - { - logtrace(msgs, "got KeepAlive"); - return READ_NOW; - } - - current_message_len = message_len; - } - - // read - auto& current_message_type = msgs->incoming.id; - if (!current_message_type) - { - auto message_type = uint8_t{}; - if (io->read_buffer_size() < sizeof(message_type)) - { - return READ_LATER; - } - - io->read_uint8(&message_type); - current_message_type = message_type; - } - - // read - auto& current_payload = msgs->incoming.payload; - auto const full_payload_len = *current_message_len - sizeof(*current_message_type); - auto n_left = full_payload_len - std::size(current_payload); - auto const [buf, n_this_pass] = current_payload.reserve_space(std::min(n_left, io->read_buffer_size())); - io->read_bytes(buf, n_this_pass); - current_payload.commit_space(n_this_pass); - n_left -= n_this_pass; - logtrace(msgs, fmt::format("read {:d} payload bytes; {:d} left to go", n_this_pass, n_left)); - - if (n_left > 0U) - { - return READ_LATER; - } - - // The incoming message is now complete. After processing the message - // with `process_peer_message()`, reset the peerMsgs' incoming - // field so it's ready to receive the next message. - - auto const [read_state, n_piece_bytes_read] = process_peer_message(msgs, *current_message_type, current_payload); - *piece = n_piece_bytes_read; - - current_message_len.reset(); - current_message_type.reset(); - current_payload.clear(); - - return read_state; -} - -// --- - -void updateDesiredRequestCount(tr_peerMsgsImpl* msgs) -{ - msgs->desired_request_count = msgs->canRequest().max_blocks; -} - -void updateMetadataRequests(tr_peerMsgsImpl* msgs, time_t now) -{ - if (!msgs->peerSupportsMetadataXfer) - { - return; - } - - if (auto const piece = msgs->torrent->get_next_metadata_request(now); piece) - { - auto tmp = tr_variant{}; - tr_variantInitDict(&tmp, 3); - tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Request); - tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); - protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, tr_variant_serde::benc().to_string(tmp)); - } -} - -void updateBlockRequests(tr_peerMsgsImpl* msgs) -{ - auto* const tor = msgs->torrent; - - if (!tor->client_can_download()) - { - return; - } - - auto const n_active = tr_peerMgrCountActiveRequestsToPeer(tor, msgs); - if (n_active >= msgs->desired_request_count) - { - return; - } - - auto const n_wanted = msgs->desired_request_count - n_active; - if (n_wanted == 0) - { - return; - } - - TR_ASSERT(msgs->client_is_interested()); - TR_ASSERT(!msgs->client_is_choked()); - - if (auto const requests = tr_peerMgrGetNextRequests(tor, msgs, n_wanted); !std::empty(requests)) - { - msgs->requestBlocks(std::data(requests), std::size(requests)); - } -} - -namespace peer_pulse_helpers -{ -[[nodiscard]] size_t add_next_metadata_piece(tr_peerMsgsImpl* msgs) -{ - auto const piece = popNextMetadataRequest(msgs); - - if (!piece.has_value()) // no pending requests - { - return {}; - } - - auto data = msgs->torrent->get_metadata_piece(*piece); - if (!data) - { - // send a reject - auto tmp = tr_variant{}; - tr_variantInitDict(&tmp, 2); - tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Reject); - tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); - return protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, tr_variant_serde::benc().to_string(tmp)); - } - - // send the metadata - auto tmp = tr_variant{}; - tr_variantInitDict(&tmp, 3); - tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Data); - tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); - tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->info_dict_size()); - return protocol_send_message(msgs, BtPeerMsgs::Ltep, msgs->ut_metadata_id, tr_variant_serde::benc().to_string(tmp), *data); -} - -[[nodiscard]] size_t add_next_piece(tr_peerMsgsImpl* msgs, uint64_t now) -{ - if (msgs->io->get_write_buffer_space(now) == 0U || std::empty(msgs->peer_requested_)) - { - return {}; - } - - auto const req = msgs->peer_requested_.front(); - msgs->peer_requested_.erase(std::begin(msgs->peer_requested_)); - - auto buf = std::array{}; - auto ok = msgs->isValidRequest(req) && msgs->torrent->has_piece(req.index); - - if (ok) - { - ok = msgs->torrent->ensure_piece_is_checked(req.index); - - if (!ok) - { - msgs->torrent->error().set_local_error(fmt::format("Please Verify Local Data! Piece #{:d} is corrupt.", req.index)); - } - } - - if (ok) - { - ok = msgs->session->cache - ->read_block(*msgs->torrent, msgs->torrent->piece_loc(req.index, req.offset), req.length, std::data(buf)) == 0; - } - - if (ok) - { - auto const piece_data = std::string_view{ reinterpret_cast(std::data(buf)), req.length }; - return protocol_send_message(msgs, BtPeerMsgs::Piece, req.index, req.offset, piece_data); - } - - if (msgs->io->supports_fext()) - { - return protocolSendReject(msgs, &req); - } - - return {}; -} - -[[nodiscard]] size_t fill_output_buffer(tr_peerMsgsImpl* msgs, time_t now_sec, uint64_t now_msec) -{ - auto n_bytes_written = size_t{}; - - // fulfuill metadata requests - for (;;) - { - auto const old_len = n_bytes_written; - n_bytes_written += add_next_metadata_piece(msgs); - if (old_len == n_bytes_written) - { - break; - } - } - - // fulfuill piece requests - for (;;) - { - auto const old_len = n_bytes_written; - n_bytes_written += add_next_piece(msgs, now_msec); - if (old_len == n_bytes_written) - { - break; - } - } - - if (msgs != nullptr && msgs->clientSentAnythingAt != 0 && now_sec - msgs->clientSentAnythingAt > KeepaliveIntervalSecs) - { - n_bytes_written += protocol_send_keepalive(msgs); - } - - return n_bytes_written; -} -} // namespace peer_pulse_helpers - -void peerPulse(void* vmsgs) -{ - using namespace peer_pulse_helpers; - - auto* msgs = static_cast(vmsgs); - auto const now_sec = tr_time(); - auto const now_msec = tr_time_msec(); - - updateDesiredRequestCount(msgs); - updateBlockRequests(msgs); - updateMetadataRequests(msgs, now_sec); - - for (;;) - { - if (fill_output_buffer(msgs, now_sec, now_msec) == 0U) - { - break; - } - } -} - -void gotError(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs) -{ - static_cast(vmsgs)->publish(tr_peer_event::GotError(ENOTCONN)); -} - -void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs) -{ - bool const fext = msgs->io->supports_fext(); - - if (fext && msgs->torrent->has_all()) - { - protocol_send_message(msgs, BtPeerMsgs::FextHaveAll); - } - else if (fext && msgs->torrent->has_none()) - { - protocol_send_message(msgs, BtPeerMsgs::FextHaveNone); - } - else if (!msgs->torrent->has_none()) - { - protocol_send_message(msgs, BtPeerMsgs::Bitfield, msgs->torrent->create_piece_bitfield()); - } -} - -void tr_peerMsgsImpl::sendPex() +void tr_peerMsgsImpl::send_ut_pex() { // only send pex if both the torrent and peer support it - if (!this->peerSupportsPex || !this->torrent->allows_pex()) + if (!peer_supports_pex_ || !tor_.allows_pex()) { return; } - static auto constexpr MaxPexAdded = size_t{ 50 }; - static auto constexpr MaxPexDropped = size_t{ 50 }; - - auto val = tr_variant{}; - tr_variantInitDict(&val, 3); /* ipv6 support: left as 3: speed vs. likelihood? */ - - auto tmpbuf = small::vector{}; + static auto constexpr MaxPexAdded = size_t{ 50U }; + static auto constexpr MaxPexDropped = size_t{ 50U }; + auto map = tr_variant::Map{ 4U }; + auto tmpbuf = small::vector{}; for (uint8_t i = 0; i < NUM_TR_AF_INET_TYPES; ++i) { static auto constexpr AddedMap = std::array{ TR_KEY_added, TR_KEY_added6 }; @@ -1991,8 +994,8 @@ void tr_peerMsgsImpl::sendPex() static auto constexpr DroppedMap = std::array{ TR_KEY_dropped, TR_KEY_dropped6 }; auto const ip_type = static_cast(i); - auto& old_pex = pex[i]; - auto new_pex = tr_peerMgrGetPeers(this->torrent, ip_type, TR_PEERS_CONNECTED, MaxPexPeerCount); + auto& old_pex = pex_[i]; + auto new_pex = tr_peerMgrGetPeers(&tor_, ip_type, TR_PEERS_CONNECTED, MaxPexPeerCount); auto added = std::vector{}; added.reserve(std::size(new_pex)); std::set_difference( @@ -2043,7 +1046,7 @@ void tr_peerMsgsImpl::sendPex() tmpbuf.reserve(std::size(added) * tr_socket_address::CompactSockAddrBytes[i]); tr_pex::to_compact(std::back_inserter(tmpbuf), std::data(added), std::size(added)); TR_ASSERT(std::size(tmpbuf) == std::size(added) * tr_socket_address::CompactSockAddrBytes[i]); - tr_variantDictAddRaw(&val, AddedMap[i], std::data(tmpbuf), std::size(tmpbuf)); + map.try_emplace(AddedMap[i], std::string_view{ reinterpret_cast(std::data(tmpbuf)), std::size(tmpbuf) }); // "added.f" tmpbuf.resize(std::size(added)); @@ -2054,8 +1057,9 @@ void tr_peerMsgsImpl::sendPex() *walk++ = std::byte{ p.flags }; } - TR_ASSERT(static_cast(walk - begin) == std::size(added)); - tr_variantDictAddRaw(&val, AddedFMap[i], begin, walk - begin); + auto const f_len = static_cast(walk - begin); + TR_ASSERT(f_len == std::size(added)); + map.try_emplace(AddedFMap[i], std::string_view{ reinterpret_cast(begin), f_len }); } if (!std::empty(dropped)) @@ -2065,17 +1069,991 @@ void tr_peerMsgsImpl::sendPex() tmpbuf.reserve(std::size(dropped) * tr_socket_address::CompactSockAddrBytes[i]); tr_pex::to_compact(std::back_inserter(tmpbuf), std::data(dropped), std::size(dropped)); TR_ASSERT(std::size(tmpbuf) == std::size(dropped) * tr_socket_address::CompactSockAddrBytes[i]); - tr_variantDictAddRaw(&val, DroppedMap[i], std::data(tmpbuf), std::size(tmpbuf)); + map.try_emplace(DroppedMap[i], std::string_view{ reinterpret_cast(std::data(tmpbuf)), std::size(tmpbuf) }); } } - protocol_send_message(this, BtPeerMsgs::Ltep, this->ut_pex_id, tr_variant_serde::benc().to_string(val)); + protocol_send_message(BtPeerMsgs::Ltep, ut_pex_id_, tr_variant_serde::benc().to_string(tr_variant{ std::move(map) })); +} + +void tr_peerMsgsImpl::send_ltep_handshake() +{ + if (client_sent_ltep_handshake_) + { + return; + } + + logtrace(this, "sending an ltep handshake"); + client_sent_ltep_handshake_ = true; + + /* decide if we want to advertise metadata xfer support (BEP 9) */ + bool const allow_metadata_xfer = tor_.is_public(); + + /* decide if we want to advertise pex support */ + bool const allow_pex = tor_.allows_pex(); + + auto val = tr_variant{}; + tr_variantInitDict(&val, 8); + tr_variantDictAddBool(&val, TR_KEY_e, session->encryptionMode() != TR_CLEAR_PREFERRED); + + // If connecting to global peer, then use global address + // Otherwise we are connecting to local peer, use bind address directly + if (auto const addr = io_->address().is_global_unicast_address() ? session->global_address(TR_AF_INET) : + session->bind_address(TR_AF_INET); + addr && !addr->is_any()) + { + TR_ASSERT(addr->is_ipv4()); + tr_variantDictAddRaw(&val, TR_KEY_ipv4, &addr->addr.addr4, sizeof(addr->addr.addr4)); + } + if (auto const addr = io_->address().is_global_unicast_address() ? session->global_address(TR_AF_INET6) : + session->bind_address(TR_AF_INET6); + addr && !addr->is_any()) + { + TR_ASSERT(addr->is_ipv6()); + tr_variantDictAddRaw(&val, TR_KEY_ipv6, &addr->addr.addr6, sizeof(addr->addr.addr6)); + } + + // https://www.bittorrent.org/beps/bep_0009.html + // It also adds "metadata_size" to the handshake message (not the + // "m" dictionary) specifying an integer value of the number of + // bytes of the metadata. + if (auto const info_dict_size = tor_.info_dict_size(); allow_metadata_xfer && tor_.has_metainfo() && info_dict_size > 0) + { + tr_variantDictAddInt(&val, TR_KEY_metadata_size, info_dict_size); + } + + // https://www.bittorrent.org/beps/bep_0010.html + // Local TCP listen port. Allows each side to learn about the TCP + // port number of the other side. Note that there is no need for the + // receiving side of the connection to send this extension message, + // since its port number is already known. + tr_variantDictAddInt(&val, TR_KEY_p, session->advertisedPeerPort().host()); + + // https://www.bittorrent.org/beps/bep_0010.html + // An integer, the number of outstanding request messages this + // client supports without dropping any. The default in in + // libtorrent is 250. + tr_variantDictAddInt(&val, TR_KEY_reqq, ReqQ); + + // https://www.bittorrent.org/beps/bep_0010.html + // A string containing the compact representation of the ip address this peer sees + // you as. i.e. this is the receiver's external ip address (no port is included). + // This may be either an IPv4 (4 bytes) or an IPv6 (16 bytes) address. + { + auto buf = std::array{}; + auto const begin = std::data(buf); + auto const end = io_->address().to_compact(begin); + auto const len = end - begin; + TR_ASSERT(len == tr_address::CompactAddrBytes[0] || len == tr_address::CompactAddrBytes[1]); + tr_variantDictAddRaw(&val, TR_KEY_yourip, begin, len); + } + + // https://www.bittorrent.org/beps/bep_0010.html + // Client name and version (as a utf-8 string). This is a much more + // reliable way of identifying the client than relying on the + // peer id encoding. + tr_variantDictAddStrView(&val, TR_KEY_v, TR_NAME " " USERAGENT_PREFIX); + + // https://www.bittorrent.org/beps/bep_0021.html + // A peer that is a partial seed SHOULD include an extra header in + // the extension handshake 'upload_only'. Setting the value of this + // key to 1 indicates that this peer is not interested in downloading + // anything. + tr_variantDictAddBool(&val, TR_KEY_upload_only, tor_.is_done()); + + if (allow_metadata_xfer || allow_pex) + { + tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2); + + if (allow_metadata_xfer) + { + tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID); + } + + if (allow_pex) + { + tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID); + } + } + + protocol_send_message(BtPeerMsgs::Ltep, LtepMessages::Handshake, tr_variant_serde::benc().to_string(val)); +} + +void tr_peerMsgsImpl::parse_ltep_handshake(MessageReader& payload) +{ + auto const handshake_sv = payload.to_string_view(); + + auto var = tr_variant_serde::benc().inplace().parse(handshake_sv); + if (!var || !var->holds_alternative()) + { + logtrace(this, "GET extended-handshake, couldn't get dictionary"); + return; + } + + logtrace(this, fmt::format("here is the base64-encoded handshake: [{:s}]", tr_base64_encode(handshake_sv))); + + // does the peer prefer encrypted connections? + auto pex = tr_pex{}; + auto& [addr, port] = pex.socket_address; + if (auto e = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_e, &e)) + { + encryption_preference_ = e != 0 ? EncryptionPreference::Yes : EncryptionPreference::No; + + if (encryption_preference_ == EncryptionPreference::Yes) + { + pex.flags |= ADDED_F_ENCRYPTION_FLAG; + } + } + + // check supported messages for utorrent pex + peer_supports_pex_ = false; + peer_supports_metadata_xfer_ = false; + + if (tr_variant* sub = nullptr; tr_variantDictFindDict(&*var, TR_KEY_m, &sub)) + { + if (auto ut_pex = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_pex, &ut_pex)) + { + peer_supports_pex_ = ut_pex != 0; + ut_pex_id_ = static_cast(ut_pex); + logtrace(this, fmt::format("msgs->ut_pex is {:d}", ut_pex_id_)); + } + + if (auto ut_metadata = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &ut_metadata)) + { + peer_supports_metadata_xfer_ = ut_metadata != 0; + ut_metadata_id_ = static_cast(ut_metadata); + logtrace(this, fmt::format("msgs->ut_metadata_id_ is {:d}", ut_metadata_id_)); + } + + if (auto ut_holepunch = int64_t{}; tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &ut_holepunch)) + { + // Transmission doesn't support this extension yet. + // But its presence does indicate µTP supports, + // which we do care about... + peer_info->set_utp_supported(true); + } + } + + // look for metainfo size (BEP 9) + if (auto metadata_size = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_metadata_size, &metadata_size)) + { + if (!tr_metadata_download::is_valid_metadata_size(metadata_size)) + { + peer_supports_metadata_xfer_ = false; + } + else + { + tor_.maybe_start_metadata_transfer(metadata_size); + } + } + + // look for upload_only (BEP 21) + if (auto upload_only = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_upload_only, &upload_only)) + { + pex.flags |= ADDED_F_SEED_FLAG; + } + + // https://www.bittorrent.org/beps/bep_0010.html + // Client name and version (as a utf-8 string). This is a much more + // reliable way of identifying the client than relying on the + // peer id encoding. + if (auto sv = std::string_view{}; tr_variantDictFindStrView(&*var, TR_KEY_v, &sv)) + { + set_user_agent(tr_interned_string{ sv }); + } + + /* get peer's listening port */ + if (auto p = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_p, &p) && p > 0) + { + port.set_host(p); + publish(tr_peer_event::GotPort(port)); + logtrace(this, fmt::format("peer's port is now {:d}", p)); + } + + std::byte const* addr_compact = nullptr; + auto addr_len = size_t{}; + if (io_->is_incoming() && tr_variantDictFindRaw(&*var, TR_KEY_ipv4, &addr_compact, &addr_len) && + addr_len == tr_address::CompactAddrBytes[TR_AF_INET]) + { + std::tie(addr, std::ignore) = tr_address::from_compact_ipv4(addr_compact); + tr_peerMgrAddPex(&tor_, TR_PEER_FROM_LTEP, &pex, 1); + } + + if (io_->is_incoming() && tr_variantDictFindRaw(&*var, TR_KEY_ipv6, &addr_compact, &addr_len) && + addr_len == tr_address::CompactAddrBytes[TR_AF_INET6]) + { + std::tie(addr, std::ignore) = tr_address::from_compact_ipv6(addr_compact); + tr_peerMgrAddPex(&tor_, TR_PEER_FROM_LTEP, &pex, 1); + } + + /* get peer's maximum request queue size */ + if (auto reqq_in = int64_t{}; tr_variantDictFindInt(&*var, TR_KEY_reqq, &reqq_in)) + { + reqq_ = reqq_in; + } +} + +void tr_peerMsgsImpl::parse_ut_metadata(MessageReader& payload_in) +{ + int64_t msg_type = -1; + int64_t piece = -1; + int64_t total_size = 0; + + auto const tmp = payload_in.to_string_view(); + auto const* const msg_end = std::data(tmp) + std::size(tmp); + + auto serde = tr_variant_serde::benc(); + if (auto var = serde.inplace().parse(tmp); var) + { + (void)tr_variantDictFindInt(&*var, TR_KEY_msg_type, &msg_type); + (void)tr_variantDictFindInt(&*var, TR_KEY_piece, &piece); + (void)tr_variantDictFindInt(&*var, TR_KEY_total_size, &total_size); + } + + logtrace(this, fmt::format("got ut_metadata msg: type {:d}, piece {:d}, total_size {:d}", msg_type, piece, total_size)); + + if (msg_type == MetadataMsgType::Reject) + { + // no-op + } + + if (auto const piece_len = msg_end - serde.end(); + msg_type == MetadataMsgType::Data && piece * MetadataPieceSize + piece_len <= total_size) + { + tor_.set_metadata_piece(piece, serde.end(), piece_len); + } + + if (msg_type == MetadataMsgType::Request) + { + if (piece >= 0 && tor_.has_metainfo() && tor_.is_public() && std::size(peer_requested_metadata_pieces_) < MetadataReqQ) + { + peer_requested_metadata_pieces_.push(piece); + } + else + { + /* send a rejection message */ + auto v = tr_variant{}; + tr_variantInitDict(&v, 2); + tr_variantDictAddInt(&v, TR_KEY_msg_type, MetadataMsgType::Reject); + tr_variantDictAddInt(&v, TR_KEY_piece, piece); + protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, serde.to_string(v)); + } + } +} + +// --- + +ReadResult tr_peerMsgsImpl::process_peer_message(uint8_t id, MessageReader& payload) +{ + bool const fext = io_->supports_fext(); + + auto ui32 = uint32_t{}; + + logtrace( + this, + fmt::format( + "got peer msg '{:s}' ({:d}) with payload len {:d}", + BtPeerMsgs::debug_name(id), + static_cast(id), + std::size(payload))); + + if (!is_message_length_correct(tor_, id, sizeof(id) + std::size(payload))) + { + logdbg( + this, + fmt::format( + "bad msg: '{:s}' ({:d}) with payload len {:d}", + BtPeerMsgs::debug_name(id), + static_cast(id), + std::size(payload))); + publish(tr_peer_event::GotError(EMSGSIZE)); + return { READ_ERR, {} }; + } + + switch (id) + { + case BtPeerMsgs::Choke: + logtrace(this, "got Choke"); + set_client_choked(true); + + if (!fext) + { + publish(tr_peer_event::GotChoke()); + } + + update_active(TR_PEER_TO_CLIENT); + break; + + case BtPeerMsgs::Unchoke: + logtrace(this, "got Unchoke"); + set_client_choked(false); + update_active(TR_PEER_TO_CLIENT); + update_desired_request_count(); + break; + + case BtPeerMsgs::Interested: + logtrace(this, "got Interested"); + set_peer_interested(true); + update_active(TR_CLIENT_TO_PEER); + break; + + case BtPeerMsgs::NotInterested: + logtrace(this, "got Not Interested"); + set_peer_interested(false); + update_active(TR_CLIENT_TO_PEER); + break; + + case BtPeerMsgs::Have: + ui32 = payload.to_uint32(); + logtrace(this, fmt::format("got Have: {:d}", ui32)); + + if (tor_.has_metainfo() && ui32 >= tor_.piece_count()) + { + publish(tr_peer_event::GotError(ERANGE)); + return { READ_ERR, {} }; + } + + /* a peer can send the same HAVE message twice... */ + if (!have_.test(ui32)) + { + have_.set(ui32); + publish(tr_peer_event::GotHave(ui32)); + } + + break; + + case BtPeerMsgs::Bitfield: + logtrace(this, "got a bitfield"); + have_ = tr_bitfield{ tor_.has_metainfo() ? tor_.piece_count() : std::size(payload) * 8 }; + have_.set_raw(reinterpret_cast(std::data(payload)), std::size(payload)); + publish(tr_peer_event::GotBitfield(&have_)); + break; + + case BtPeerMsgs::Request: + { + struct peer_request r; + r.index = payload.to_uint32(); + r.offset = payload.to_uint32(); + r.length = payload.to_uint32(); + logtrace(this, fmt::format("got Request: {:d}:{:d}->{:d}", r.index, r.offset, r.length)); + on_peer_made_request(r); + break; + } + + case BtPeerMsgs::Cancel: + { + struct peer_request r; + r.index = payload.to_uint32(); + r.offset = payload.to_uint32(); + r.length = payload.to_uint32(); + cancels_sent_to_client.add(tr_time(), 1); + logtrace(this, fmt::format("got a Cancel {:d}:{:d}->{:d}", r.index, r.offset, r.length)); + + auto& requests = peer_requested_; + if (auto iter = std::find(std::begin(requests), std::end(requests), r); iter != std::end(requests)) + { + requests.erase(iter); + + // bep6: "Even when a request is cancelled, the peer + // receiving the cancel should respond with either the + // corresponding reject or the corresponding piece" + if (fext) + { + protocol_send_reject(r); + } + } + break; + } + + case BtPeerMsgs::Piece: + return read_piece_data(payload); + + case BtPeerMsgs::Port: + // https://www.bittorrent.org/beps/bep_0005.html + // Peers supporting the DHT set the last bit of the 8-byte reserved flags + // exchanged in the BitTorrent protocol handshake. Peer receiving a handshake + // indicating the remote peer supports the DHT should send a PORT message. + // It begins with byte 0x09 and has a two byte payload containing the UDP + // port of the DHT node in network byte order. + { + logtrace(this, "Got a BtPeerMsgs::Port"); + + auto const hport = payload.to_uint16(); + if (auto const dht_port = tr_port::from_host(hport); !std::empty(dht_port)) + { + dht_port_ = dht_port; + session->maybe_add_dht_node(io_->address(), dht_port_); + } + } + break; + + case BtPeerMsgs::FextSuggest: + logtrace(this, "Got a BtPeerMsgs::FextSuggest"); + + if (fext) + { + auto const piece = payload.to_uint32(); + publish(tr_peer_event::GotSuggest(piece)); + } + else + { + publish(tr_peer_event::GotError(EMSGSIZE)); + return { READ_ERR, {} }; + } + + break; + + case BtPeerMsgs::FextAllowedFast: + logtrace(this, "Got a BtPeerMsgs::FextAllowedFast"); + + if (fext) + { + auto const piece = payload.to_uint32(); + publish(tr_peer_event::GotAllowedFast(piece)); + } + else + { + publish(tr_peer_event::GotError(EMSGSIZE)); + return { READ_ERR, {} }; + } + + break; + + case BtPeerMsgs::FextHaveAll: + logtrace(this, "Got a BtPeerMsgs::FextHaveAll"); + + if (fext) + { + have_.set_has_all(); + publish(tr_peer_event::GotHaveAll()); + } + else + { + publish(tr_peer_event::GotError(EMSGSIZE)); + return { READ_ERR, {} }; + } + + break; + + case BtPeerMsgs::FextHaveNone: + logtrace(this, "Got a BtPeerMsgs::FextHaveNone"); + + if (fext) + { + have_.set_has_none(); + publish(tr_peer_event::GotHaveNone()); + } + else + { + publish(tr_peer_event::GotError(EMSGSIZE)); + return { READ_ERR, {} }; + } + + break; + + case BtPeerMsgs::FextReject: + { + struct peer_request r; + r.index = payload.to_uint32(); + r.offset = payload.to_uint32(); + r.length = payload.to_uint32(); + + if (fext) + { + publish(tr_peer_event::GotRejected(tor_.block_info(), tor_.piece_loc(r.index, r.offset).block)); + } + else + { + publish(tr_peer_event::GotError(EMSGSIZE)); + return { READ_ERR, {} }; + } + + break; + } + + case BtPeerMsgs::Ltep: + logtrace(this, "Got a BtPeerMsgs::Ltep"); + parse_ltep(payload); + break; + + default: + logtrace(this, fmt::format("peer sent us an UNKNOWN: {:d}", static_cast(id))); + break; + } + + return { READ_NOW, {} }; +} + +ReadResult tr_peerMsgsImpl::read_piece_data(MessageReader& payload) +{ + // + auto const piece = payload.to_uint32(); + auto const offset = payload.to_uint32(); + auto const len = std::size(payload); + + auto const loc = tor_.piece_loc(piece, offset); + auto const block = loc.block; + auto const block_size = tor_.block_size(block); + + logtrace(this, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", len, piece, offset, len)); + + if (loc.block_offset + len > block_size) + { + logwarn(this, fmt::format("got unaligned piece {:d}:{:d}->{:d}", piece, offset, len)); + return { READ_ERR, len }; + } + + if (!tr_peerMgrDidPeerRequest(&tor_, this, block)) + { + logwarn(this, fmt::format("got unrequested piece {:d}:{:d}->{:d}", piece, offset, len)); + return { READ_ERR, len }; + } + + publish(tr_peer_event::GotPieceData(len)); + + if (loc.block_offset == 0U && len == block_size) // simple case: one message has entire block + { + auto buf = std::make_unique(block_size); + payload.to_buf(std::data(*buf), len); + auto const ok = client_got_block(std::move(buf), block) == 0; + return { ok ? READ_NOW : READ_ERR, len }; + } + + auto& blocks = incoming_.blocks; + auto& incoming_block = blocks.try_emplace(block, block_size).first->second; + payload.to_buf(std::data(*incoming_block.buf) + loc.block_offset, len); + + if (!incoming_block.add_span(loc.block_offset, loc.block_offset + len)) + { + return { READ_ERR, len }; // invalid span + } + + if (!incoming_block.has_all()) + { + return { READ_LATER, len }; // we don't have the full block yet + } + + auto block_buf = std::move(incoming_block.buf); + blocks.erase(block); // note: invalidates `incoming_block` local + auto const ok = client_got_block(std::move(block_buf), block) == 0; + return { ok ? READ_NOW : READ_ERR, len }; +} + +// returns 0 on success, or an errno on failure +int tr_peerMsgsImpl::client_got_block(std::unique_ptr block_data, tr_block_index_t const block) +{ + auto const n_expected = tor_.block_size(block); + + if (!block_data) + { + logdbg(this, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, 0)); + return EMSGSIZE; + } + + if (std::size(*block_data) != tor_.block_size(block)) + { + logdbg(this, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, std::size(*block_data))); + return EMSGSIZE; + } + + logtrace(this, fmt::format("got block {:d}", block)); + + if (!tr_peerMgrDidPeerRequest(&tor_, this, block)) + { + logdbg(this, "we didn't ask for this message..."); + return 0; + } + + auto const loc = tor_.block_loc(block); + if (tor_.has_piece(loc.piece)) + { + logtrace(this, "we did ask for this message, but the piece is already complete..."); + return 0; + } + + // NB: if writeBlock() fails the torrent may be paused. + // If this happens, this object will be destructed and must no longer be used. + if (auto const err = session->cache->write_block(tor_.id(), block, std::move(block_data)); err != 0) + { + return err; + } + + blame.set(loc.piece); + publish(tr_peer_event::GotBlock(tor_.block_info(), block)); + + return 0; +} + +// --- + +void tr_peerMsgsImpl::did_write(tr_peerIo* /*io*/, size_t bytes_written, bool was_piece_data, void* vmsgs) +{ + auto* const msgs = static_cast(vmsgs); + + if (was_piece_data) + { + msgs->publish(tr_peer_event::SentPieceData(bytes_written)); + } + + msgs->pulse(); +} + +ReadState tr_peerMsgsImpl::can_read(tr_peerIo* io, void* vmsgs, size_t* piece) +{ + auto* const msgs = static_cast(vmsgs); + + // https://www.bittorrent.org/beps/bep_0003.html + // Next comes an alternating stream of length prefixes and messages. + // Messages of length zero are keepalives, and ignored. + // All non-keepalive messages start with a single byte which gives their type. + // + // https://wiki.theory.org/BitTorrentSpecification + // All of the remaining messages in the protocol take the form of + // . The length prefix is a four byte + // big-endian value. The message ID is a single decimal byte. + // The payload is message dependent. + + // read + auto& current_message_len = msgs->incoming_.length; // the full message payload length. Includes the +1 for id length + if (!current_message_len) + { + auto message_len = uint32_t{}; + if (io->read_buffer_size() < sizeof(message_len)) + { + return READ_LATER; + } + + io->read_uint32(&message_len); + + // The keep-alive message is a message with zero bytes, + // specified with the length prefix set to zero. + // There is no message ID and no payload. + if (message_len == 0U) + { + logtrace(msgs, "got KeepAlive"); + return READ_NOW; + } + + current_message_len = message_len; + } + + // read + auto& current_message_type = msgs->incoming_.id; + if (!current_message_type) + { + auto message_type = uint8_t{}; + if (io->read_buffer_size() < sizeof(message_type)) + { + return READ_LATER; + } + + io->read_uint8(&message_type); + current_message_type = message_type; + } + + // read + auto& current_payload = msgs->incoming_.payload; + auto const full_payload_len = *current_message_len - sizeof(*current_message_type); + auto n_left = full_payload_len - std::size(current_payload); + auto const [buf, n_this_pass] = current_payload.reserve_space(std::min(n_left, io->read_buffer_size())); + io->read_bytes(buf, n_this_pass); + current_payload.commit_space(n_this_pass); + n_left -= n_this_pass; + logtrace(msgs, fmt::format("read {:d} payload bytes; {:d} left to go", n_this_pass, n_left)); + + if (n_left > 0U) + { + return READ_LATER; + } + + // The incoming message is now complete. After processing the message + // with `process_peer_message()`, reset the peerMsgs' incoming + // field so it's ready to receive the next message. + + auto const [read_state, n_piece_bytes_read] = msgs->process_peer_message(*current_message_type, current_payload); + *piece = n_piece_bytes_read; + + current_message_len.reset(); + current_message_type.reset(); + current_payload.clear(); + + return read_state; +} + +void tr_peerMsgsImpl::got_error(tr_peerIo* /*io*/, tr_error const& /*error*/, void* vmsgs) +{ + static_cast(vmsgs)->publish(tr_peer_event::GotError(ENOTCONN)); +} + +// --- + +void tr_peerMsgsImpl::pulse() +{ + auto const now_sec = tr_time(); + auto const now_msec = tr_time_msec(); + + update_desired_request_count(); + update_block_requests(); + update_metadata_requests(now_sec); + + for (;;) + { + if (fill_output_buffer(now_sec, now_msec) == 0U) + { + break; + } + } +} + +void tr_peerMsgsImpl::update_metadata_requests(time_t now) const +{ + if (!peer_supports_metadata_xfer_) + { + return; + } + + if (auto const piece = tor_.get_next_metadata_request(now); piece) + { + auto tmp = tr_variant{}; + tr_variantInitDict(&tmp, 3); + tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Request); + tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); + protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, tr_variant_serde::benc().to_string(tmp)); + } +} + +void tr_peerMsgsImpl::update_block_requests() +{ + if (!tor_.client_can_download()) + { + return; + } + + auto const n_active = tr_peerMgrCountActiveRequestsToPeer(&tor_, this); + if (n_active >= desired_request_count_) + { + return; + } + + TR_ASSERT(client_is_interested()); + TR_ASSERT(!client_is_choked()); + + auto const n_wanted = desired_request_count_ - n_active; + if (auto const requests = tr_peerMgrGetNextRequests(&tor_, this, n_wanted); !std::empty(requests)) + { + request_blocks(std::data(requests), std::size(requests)); + } +} + +[[nodiscard]] size_t tr_peerMsgsImpl::fill_output_buffer(time_t now_sec, uint64_t now_msec) +{ + auto n_bytes_written = size_t{}; + + // fulfill metadata requests + for (;;) + { + auto const old_len = n_bytes_written; + n_bytes_written += add_next_metadata_piece(); + if (old_len == n_bytes_written) + { + break; + } + } + + // fulfill piece requests + for (;;) + { + auto const old_len = n_bytes_written; + n_bytes_written += add_next_block(now_sec, now_msec); + if (old_len == n_bytes_written) + { + break; + } + } + + if (client_sent_at_ != 0 && now_sec - client_sent_at_ > KeepaliveIntervalSecs) + { + n_bytes_written += protocol_send_keepalive(); + } + + return n_bytes_written; +} + +[[nodiscard]] size_t tr_peerMsgsImpl::add_next_metadata_piece() +{ + auto const piece = pop_next_metadata_request(); + + if (!piece.has_value()) // no pending requests + { + return {}; + } + + auto data = tor_.get_metadata_piece(*piece); + if (!data) + { + // send a reject + auto tmp = tr_variant{}; + tr_variantInitDict(&tmp, 2); + tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Reject); + tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); + return protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, tr_variant_serde::benc().to_string(tmp)); + } + + // send the metadata + auto tmp = tr_variant{}; + tr_variantInitDict(&tmp, 3); + tr_variantDictAddInt(&tmp, TR_KEY_msg_type, MetadataMsgType::Data); + tr_variantDictAddInt(&tmp, TR_KEY_piece, *piece); + tr_variantDictAddInt(&tmp, TR_KEY_total_size, tor_.info_dict_size()); + return protocol_send_message(BtPeerMsgs::Ltep, ut_metadata_id_, tr_variant_serde::benc().to_string(tmp), *data); +} + +[[nodiscard]] size_t tr_peerMsgsImpl::add_next_block(time_t now_sec, uint64_t now_msec) +{ + if (std::empty(peer_requested_) || io_->get_write_buffer_space(now_msec) == 0U) + { + return {}; + } + + auto const req = peer_requested_.front(); + peer_requested_.pop_front(); + + auto buf = std::array{}; + auto ok = is_valid_request(req) && tor_.has_piece(req.index); + + if (ok) + { + ok = tor_.ensure_piece_is_checked(req.index); + + if (!ok) + { + tor_.error().set_local_error(fmt::format("Please Verify Local Data! Piece #{:d} is corrupt.", req.index)); + } + } + + if (ok) + { + ok = session->cache->read_block(tor_, tor_.piece_loc(req.index, req.offset), req.length, std::data(buf)) == 0; + } + + if (ok) + { + blocks_sent_to_peer.add(now_sec, 1); + auto const piece_data = std::string_view{ reinterpret_cast(std::data(buf)), req.length }; + return protocol_send_message(BtPeerMsgs::Piece, req.index, req.offset, piece_data); + } + + if (io_->supports_fext()) + { + return protocol_send_reject(req); + } + + return {}; +} + +// --- + +bool tr_peerMsgsImpl::is_valid_request(peer_request const& req) const +{ + int err = 0; + + if (req.index >= tor_.piece_count()) + { + err = 1; + } + else if (req.length < 1) + { + err = 2; + } + else if (req.offset + req.length > tor_.piece_size(req.index)) + { + err = 3; + } + else if (req.length > tr_block_info::BlockSize) + { + err = 4; + } + else if (tor_.piece_loc(req.index, req.offset, req.length).byte > tor_.total_size()) + { + err = 5; + } + + if (err != 0) + { + tr_logAddTraceTor(&tor_, fmt::format("index {} offset {} length {} err {}", req.index, req.offset, req.length, err)); + } + + return err == 0; +} + +[[nodiscard]] bool tr_peerMsgsImpl::can_add_request_from_peer(peer_request const& req) +{ + if (peer_is_choked()) + { + logtrace(this, "rejecting request from choked peer"); + return false; + } + + if (std::size(peer_requested_) >= ReqQ) + { + logtrace(this, "rejecting request ... reqq is full"); + return false; + } + + if (!is_valid_request(req)) + { + logtrace(this, "rejecting an invalid request."); + return false; + } + + if (!tor_.has_piece(req.index)) + { + logtrace(this, "rejecting request for a piece we don't have."); + return false; + } + + return true; +} + +size_t tr_peerMsgsImpl::max_available_reqs() const +{ + if (tor_.is_done() || !tor_.has_metainfo() || client_is_choked() || !client_is_interested()) + { + return 0; + } + + // Get the rate limit we should use. + // TODO: this needs to consider all the other peers as well... + uint64_t const now = tr_time_msec(); + auto rate = get_piece_speed(now, TR_PEER_TO_CLIENT); + if (tor_.uses_speed_limit(TR_PEER_TO_CLIENT)) + { + rate = std::min(rate, tor_.speed_limit(TR_PEER_TO_CLIENT)); + } + + // honor the session limits, if enabled + if (tor_.uses_session_limits()) + { + if (auto const limit = session->active_speed_limit(TR_PEER_TO_CLIENT)) + { + rate = std::min(rate, *limit); + } + } + + // use this desired rate to figure out how + // many requests we should send to this peer + static auto constexpr Floor = size_t{ 32 }; + static size_t constexpr Seconds = RequestBufSecs; + size_t const estimated_blocks_in_period = (rate.base_quantity() * Seconds) / tr_block_info::BlockSize; + auto const ceil = reqq_.value_or(250); + + return std::clamp(estimated_blocks_in_period, Floor, ceil); } } // namespace tr_peerMsgs::tr_peerMsgs( - tr_torrent const* tor, + tr_torrent const& tor, tr_peer_info* peer_info_in, tr_interned_string user_agent, bool connection_is_encrypted, @@ -2095,17 +2073,17 @@ tr_peerMsgs::tr_peerMsgs( tr_peerMsgs::~tr_peerMsgs() { peer_info->set_connected(tr_time(), false); - [[maybe_unused]] auto const n_prev = n_peers--; - TR_ASSERT(n_prev > 0U); + TR_ASSERT(n_peers > 0U); + --n_peers; } -tr_peerMsgs* tr_peerMsgsNew( - tr_torrent* const torrent, +tr_peerMsgs* tr_peerMsgs::create( + tr_torrent& torrent, tr_peer_info* const peer_info, std::shared_ptr io, tr_interned_string user_agent, tr_peer_callback_bt callback, void* callback_data) { - return new tr_peerMsgsImpl(torrent, peer_info, std::move(io), user_agent, callback, callback_data); + return new tr_peerMsgsImpl{ torrent, peer_info, std::move(io), user_agent, callback, callback_data }; } diff --git a/libtransmission/peer-msgs.h b/libtransmission/peer-msgs.h index 52f62e2f2..2017a9f89 100644 --- a/libtransmission/peer-msgs.h +++ b/libtransmission/peer-msgs.h @@ -36,7 +36,7 @@ class tr_peerMsgs : public tr_peer { public: tr_peerMsgs( - tr_torrent const* tor, + tr_torrent const& tor, tr_peer_info* peer_info_in, tr_interned_string user_agent, bool connection_is_encrypted, @@ -97,17 +97,23 @@ public: [[nodiscard]] virtual tr_socket_address socket_address() const = 0; - virtual void cancel_block_request(tr_block_index_t block) = 0; - virtual void set_choke(bool peer_is_choked) = 0; virtual void set_interested(bool client_is_interested) = 0; virtual void pulse() = 0; - virtual void onTorrentGotMetainfo() = 0; + virtual void on_torrent_got_metainfo() noexcept = 0; virtual void on_piece_completed(tr_piece_index_t) = 0; + static tr_peerMsgs* create( + tr_torrent& torrent, + tr_peer_info* peer_info, + std::shared_ptr io, + tr_interned_string user_agent, + tr_peer_callback_bt callback, + void* callback_data); + protected: constexpr void set_client_choked(bool val) noexcept { @@ -169,12 +175,4 @@ private: bool peer_is_interested_ = false; }; -tr_peerMsgs* tr_peerMsgsNew( - tr_torrent* torrent, - tr_peer_info* peer_info, - std::shared_ptr io, - tr_interned_string user_agent, - tr_peer_callback_bt callback, - void* callback_data); - /* @} */ diff --git a/libtransmission/torrent-metainfo.cc b/libtransmission/torrent-metainfo.cc index 9cdb8fbad..9ad4cf99e 100644 --- a/libtransmission/torrent-metainfo.cc +++ b/libtransmission/torrent-metainfo.cc @@ -32,7 +32,7 @@ using namespace std::literals; /** * @brief Ensure that the URLs for multfile torrents end in a slash. * - * See http://bittorrent.org/beps/bep_0019.html#metadata-extension + * See https://www.bittorrent.org/beps/bep_0019.html#metadata-extension * for background on how the trailing slash is used for "url-list" * fields. * @@ -513,7 +513,7 @@ private: bool finish(Context const& context) { // bittorrent 1.0 spec - // http://bittorrent.org/beps/bep_0003.html + // https://www.bittorrent.org/beps/bep_0003.html // // "There is also a key length or a key files, but not both or neither. // diff --git a/libtransmission/torrent-metainfo.h b/libtransmission/torrent-metainfo.h index f555b60e6..d878646e9 100644 --- a/libtransmission/torrent-metainfo.h +++ b/libtransmission/torrent-metainfo.h @@ -231,7 +231,7 @@ private: // Offset + size of the bencoded info dict subset of the bencoded data. // Used when loading pieces of it to sent to magnet peers. - // See http://bittorrent.org/beps/bep_0009.html + // See https://www.bittorrent.org/beps/bep_0009.html uint64_t info_dict_size_ = 0; uint64_t info_dict_offset_ = 0; diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index fbb04f17d..d6ab51a43 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -168,15 +168,25 @@ void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtas class tr_webseed final : public tr_peer { public: - tr_webseed(struct tr_torrent* tor, std::string_view url, tr_peer_callback_webseed callback_in, void* callback_data_in) + struct RequestLimit + { + // How many spans those blocks could be in. + // This is for webseeds, which make parallel requests. + size_t max_spans = 0; + + // How many blocks we could request. + size_t max_blocks = 0; + }; + + tr_webseed(tr_torrent& tor, std::string_view url, tr_peer_callback_webseed callback_in, void* callback_data_in) : tr_peer{ tor } - , torrent_id{ tr_torrentId(tor) } + , torrent_id{ tor.id() } , base_url{ url } , callback{ callback_in } , callback_data{ callback_data_in } , idle_timer_{ session->timerMaker().create([this]() { on_idle(this); }) } - , have_{ tor->piece_count() } - , bandwidth_{ &tor->bandwidth() } + , have_{ tor.piece_count() } + , bandwidth_{ &tor.bandwidth() } { have_.set_has_all(); idle_timer_->start_repeating(IdleTimerInterval); @@ -204,7 +214,7 @@ public: return dir == TR_DOWN ? bandwidth_.get_piece_speed(now, dir) : Speed{}; } - [[nodiscard]] TR_CONSTEXPR20 size_t activeReqCount(tr_direction dir) const noexcept override + [[nodiscard]] TR_CONSTEXPR20 size_t active_req_count(tr_direction dir) const noexcept override { if (dir == TR_CLIENT_TO_PEER) // blocks we've requested { @@ -250,7 +260,7 @@ public: } } - void requestBlocks(tr_block_span_t const* block_spans, size_t n_spans) override + void request_blocks(tr_block_span_t const* block_spans, size_t n_spans) override { auto* const tor = getTorrent(); if (tor == nullptr || !tor->is_running() || tor->is_done()) @@ -269,7 +279,7 @@ public: } } - [[nodiscard]] RequestLimit canRequest() const noexcept override + [[nodiscard]] RequestLimit max_available_reqs() const noexcept { auto const n_slots = connection_limiter.slotsAvailable(); if (n_slots == 0) @@ -285,7 +295,7 @@ public: // Prefer to request large, contiguous chunks from webseeds. // The actual value of '64' is arbitrary here; // we could probably be smarter about this. - auto constexpr PreferredBlocksPerTask = size_t{ 64 }; + static auto constexpr PreferredBlocksPerTask = size_t{ 64 }; return { n_slots, n_slots * PreferredBlocksPerTask }; } @@ -413,7 +423,7 @@ void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtas void on_idle(tr_webseed* webseed) { - auto const [max_spans, max_blocks] = webseed->canRequest(); + auto const [max_spans, max_blocks] = webseed->max_available_reqs(); if (max_spans == 0 || max_blocks == 0) { return; @@ -427,7 +437,7 @@ void on_idle(tr_webseed* webseed) { spans.resize(max_spans); } - webseed->requestBlocks(std::data(spans), std::size(spans)); + webseed->request_blocks(std::data(spans), std::size(spans)); } void onPartialDataFetched(tr_web::FetchResponse const& web_response) @@ -523,9 +533,9 @@ void task_request_next_chunk(tr_webseed_task* task) // --- -tr_peer* tr_webseedNew(tr_torrent* torrent, std::string_view url, tr_peer_callback_webseed callback, void* callback_data) +tr_peer* tr_webseedNew(tr_torrent& torrent, std::string_view url, tr_peer_callback_webseed callback, void* callback_data) { - return new tr_webseed(torrent, url, callback, callback_data); + return new tr_webseed{ torrent, url, callback, callback_data }; } tr_webseed_view tr_webseedView(tr_peer const* peer) diff --git a/libtransmission/webseed.h b/libtransmission/webseed.h index f064abd67..3ac1bf434 100644 --- a/libtransmission/webseed.h +++ b/libtransmission/webseed.h @@ -17,6 +17,6 @@ using tr_peer_callback_webseed = tr_peer_callback_generic; -tr_peer* tr_webseedNew(struct tr_torrent* torrent, std::string_view, tr_peer_callback_webseed callback, void* callback_data); +tr_peer* tr_webseedNew(tr_torrent& torrent, std::string_view, tr_peer_callback_webseed callback, void* callback_data); tr_webseed_view tr_webseedView(tr_peer const* peer); diff --git a/tests/libtransmission/completion-test.cc b/tests/libtransmission/completion-test.cc index 25f0a826c..da3c57c0a 100644 --- a/tests/libtransmission/completion-test.cc +++ b/tests/libtransmission/completion-test.cc @@ -187,7 +187,7 @@ TEST_F(CompletionTest, percentCompleteAndDone) EXPECT_DOUBLE_EQ(0.5, completion.percent_done()); // but marking some of the pieces we have as unwanted - // should not change percentDone + // should not change percent_done for (size_t i = 0; i < 16; ++i) { torrent.dnd_pieces.insert(i); @@ -197,7 +197,7 @@ TEST_F(CompletionTest, percentCompleteAndDone) EXPECT_DOUBLE_EQ(0.5, completion.percent_done()); // but marking some of the pieces we DON'T have as unwanted - // SHOULD change percentDone + // SHOULD change percent_done for (size_t i = 32; i < 48; ++i) { torrent.dnd_pieces.insert(i);