diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 3f19c4a92..df4ead4d4 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -283,7 +284,7 @@ class tr_swarm { public: using Peers = std::vector; - using Pool = std::unordered_map; + using Pool = small::map>; class WishlistMediator final : public Wishlist::Mediator { @@ -377,12 +378,16 @@ public: void remove_inactive_peer_info() noexcept { auto const now = tr_time(); - for (auto iter = std::begin(connectable_pool), end = std::end(connectable_pool); iter != end;) + + // N.B. Unlike `std::map`, erasing elements in `small::map` seems to invalidate + // iterators other than the one being erased. So make sure `std::end()` is called + // every iteration + for (auto iter = std::begin(connectable_pool); iter != std::end(connectable_pool);) { - auto& [socket_address, peer_info] = *iter; - if (peer_info.is_inactive(now)) + auto const& [socket_address, peer_info] = *iter; + if (peer_info->is_inactive(now)) { - --stats.known_peer_from_count[peer_info.from_first()]; + --stats.known_peer_from_count[peer_info->from_first()]; iter = connectable_pool.erase(iter); } else @@ -416,10 +421,8 @@ public: peer_disconnect.emit(tor, peer->has()); - auto* const peer_info = peer->peer_info; - auto const socket_address = peer->socket_address(); - [[maybe_unused]] auto const is_incoming = peer->is_incoming_connection(); - TR_ASSERT(peer_info != nullptr); + auto const& peer_info = peer->peer_info; + TR_ASSERT(peer_info); --stats.peer_count; --stats.peer_from_count[peer_info->from_first()]; @@ -431,17 +434,6 @@ public: } delete peer; - - if (std::empty(peer_info->listen_port())) // is not connectable - { - TR_ASSERT(is_incoming); - [[maybe_unused]] auto const count = incoming_pool.erase(socket_address); - TR_ASSERT(count != 0U); - } - else - { - graveyard_pool.erase(peer_info->listen_socket_address()); - } } void remove_all_peers() @@ -475,37 +467,41 @@ public: pool_is_all_seeds_ = std::all_of( std::begin(connectable_pool), std::end(connectable_pool), - [](auto const& key_val) { return key_val.second.is_seed(); }); + [](auto const& key_val) { return key_val.second->is_seed(); }); } return *pool_is_all_seeds_; } - [[nodiscard]] tr_peer_info* get_existing_peer_info(tr_socket_address const& socket_address) noexcept + [[nodiscard]] std::shared_ptr get_existing_peer_info(tr_socket_address const& socket_address) const noexcept { - auto&& it = connectable_pool.find(socket_address); - return it != connectable_pool.end() ? &it->second : nullptr; + if (auto it = connectable_pool.find(socket_address); it != std::end(connectable_pool)) + { + return it->second; + } + + return {}; } - tr_peer_info& ensure_info_exists( + std::shared_ptr ensure_info_exists( tr_socket_address const& socket_address, uint8_t const flags, - tr_peer_from const from, - bool is_connectable) + tr_peer_from const from) { TR_ASSERT(socket_address.is_valid()); TR_ASSERT(from < TR_PEER_FROM__MAX); - auto&& [it, is_new] = is_connectable ? connectable_pool.try_emplace(socket_address, socket_address, flags, from) : - incoming_pool.try_emplace(socket_address, socket_address.address(), flags, from); - auto& peer_info = it->second; - if (!is_new) + auto peer_info = get_existing_peer_info(socket_address); + if (peer_info) { - peer_info.found_at(from); - peer_info.set_pex_flags(flags); + peer_info->found_at(from); + peer_info->set_pex_flags(flags); } - else if (is_connectable) + else { + peer_info = connectable_pool + .try_emplace(socket_address, std::make_shared(socket_address, flags, from)) + .first->second; ++stats.known_peer_from_count[from]; } @@ -568,19 +564,13 @@ public: break; case tr_peer_event::Type::ClientGotPort: - if (std::empty(event.port)) + // We have 2 cases: + // 1. We don't know the listening port of this peer (i.e. incoming connection and first time ClientGotPort) + // 2. We got a new listening port from a known peer + if (auto const& info = msgs->peer_info; + !std::empty(event.port) && info && (std::empty(info->listen_port()) || info->listen_port() != event.port)) { - // Do nothing - } - // If we don't know the listening port of this peer (i.e. incoming connection and first time ClientGotPort) - else if (auto const& info = *msgs->peer_info; std::empty(info.listen_port())) - { - s->on_got_port(msgs, event, false); - } - // If we got a new listening port from a known connectable peer - else if (info.listen_port() != event.port) - { - s->on_got_port(msgs, event, true); + s->on_got_port(msgs, event); } break; @@ -622,9 +612,6 @@ public: std::unique_ptr wishlist; - // tr_peerMsgs hold pointers to the items in these containers, - // therefore references to elements within cannot invalidate - Pool incoming_pool; Pool connectable_pool; tr_peerMsgs* optimistic = nullptr; /* the optimistic peer, or nullptr if none */ @@ -674,7 +661,7 @@ private: remove_all_peers(); for (auto& [sockaddr, peer_info] : connectable_pool) { - peer_info.destroy_handshake(); + peer_info->destroy_handshake(); } } @@ -717,9 +704,9 @@ private: { auto const lock = unique_lock(); - for (auto& [socket_address, atom] : connectable_pool) + for (auto const& [socket_address, peer_info] : connectable_pool) { - mark_peer_as_seed(atom); + mark_peer_as_seed(*peer_info); } mark_all_seeds_flag_dirty(); @@ -862,89 +849,75 @@ private: tor->session->add_downloaded(sent_length); } - void on_got_port(tr_peerMsgs* const msgs, tr_peer_event const& event, bool was_connectable) + void on_got_port(tr_peerMsgs* const msgs, tr_peer_event const& event) { - auto& info_this = *msgs->peer_info; - TR_ASSERT(info_this.is_connected()); - TR_ASSERT(was_connectable != std::empty(info_this.listen_port())); + auto info_this = msgs->peer_info; + TR_ASSERT(info_this->is_connected()); + TR_ASSERT(info_this->listen_port() != event.port); - // If we already know about this peer, merge the info objects without invalidating references - if (auto it_that = connectable_pool.find({ info_this.listen_address(), event.port }); + // we already know about this peer + if (auto it_that = connectable_pool.find({ info_this->listen_address(), event.port }); it_that != std::end(connectable_pool)) { - auto& info_that = it_that->second; - TR_ASSERT(it_that->first == info_that.listen_socket_address()); - TR_ASSERT(it_that->first.address() == info_this.listen_address()); - TR_ASSERT(it_that->first.port() != info_this.listen_port()); + auto const info_that = it_that->second; + TR_ASSERT(it_that->first == info_that->listen_socket_address()); + TR_ASSERT(it_that->first.address() == info_this->listen_address()); + TR_ASSERT(it_that->first.port() != info_this->listen_port()); - // If there is an existing connection to this peer, keep the better one - if (info_that.is_connected() && on_got_port_duplicate_connection(msgs, it_that, was_connectable)) + // if there is an existing connection to this peer, keep the better one + if (info_that->is_connected() && on_got_port_duplicate_connection(msgs, info_that)) { - return; + goto EXIT; // NOLINT cppcoreguidelines-avoid-goto } - info_this.merge(info_that); - auto from = info_that.from_first(); - stats.known_peer_from_count[from] -= connectable_pool.erase(info_that.listen_socket_address()); + // merge the peer info objects + info_this->merge(*info_that); + + // info_that will be replaced by info_this later, so decrement stat + --stats.known_peer_from_count[info_that->from_first()]; } - else if (!was_connectable) + // we are going to insert a brand-new peer info object to the pool + else if (std::empty(info_this->listen_port())) { - info_this.set_connectable(); + info_this->set_connectable(); } - auto nh = was_connectable ? connectable_pool.extract(info_this.listen_socket_address()) : - incoming_pool.extract(msgs->socket_address()); - TR_ASSERT(!std::empty(nh)); - if (was_connectable) - { - TR_ASSERT(nh.key() == nh.mapped().listen_socket_address()); - } - else - { - ++stats.known_peer_from_count[nh.mapped().from_first()]; - TR_ASSERT(nh.key().address() == nh.mapped().listen_address()); - } - nh.key().port_ = event.port; - [[maybe_unused]] auto const inserted = connectable_pool.insert(std::move(nh)).inserted; - TR_ASSERT(inserted); - info_this.set_listen_port(event.port); + // erase the old peer info entry + stats.known_peer_from_count[info_this->from_first()] -= connectable_pool.erase(info_this->listen_socket_address()); + // set new listen port + info_this->set_listen_port(event.port); + + // insert or replace the peer info ptr at the target location + ++stats.known_peer_from_count[info_this->from_first()]; + connectable_pool.insert_or_assign(info_this->listen_socket_address(), std::move(info_this)); + +EXIT: mark_all_seeds_flag_dirty(); } - bool on_got_port_duplicate_connection(tr_peerMsgs* const msgs, Pool::iterator& it_that, bool was_connectable) + bool on_got_port_duplicate_connection(tr_peerMsgs* const msgs, std::shared_ptr info_that) { - auto& info_this = *msgs->peer_info; - auto& info_that = it_that->second; + auto const info_this = msgs->peer_info; - TR_ASSERT(info_that.is_connected()); + TR_ASSERT(info_that->is_connected()); - if (CompareAtomsByUsefulness(info_this, info_that)) + if (CompareAtomsByUsefulness(*info_this, *info_that)) { - auto it = std::find_if( + auto const it = std::find_if( std::begin(peers), std::end(peers), - [&info_that](tr_peerMsgs const* const peer) { return peer->peer_info == &info_that; }); + [&info_that](tr_peerMsgs const* const peer) { return peer->peer_info == info_that; }); TR_ASSERT(it != std::end(peers)); (*it)->do_purge = true; - --stats.known_peer_from_count[info_that.from_first()]; - // Note that it_that is invalid after this point - graveyard_pool.insert(connectable_pool.extract(it_that)); - return false; } - info_that.merge(info_this); + info_that->merge(*info_this); msgs->do_purge = true; + stats.known_peer_from_count[info_this->from_first()] -= connectable_pool.erase(info_this->listen_socket_address()); - if (was_connectable) - { - --stats.known_peer_from_count[info_this.from_first()]; - graveyard_pool.insert(connectable_pool.extract(info_this.listen_socket_address())); - } - - mark_all_seeds_flag_dirty(); return true; } @@ -958,10 +931,6 @@ private: std::array const tags_; - // tr_peerMsgs hold pointers to the items in these containers, - // therefore references to elements within cannot invalidate - Pool graveyard_pool; - mutable std::optional pool_is_all_seeds_; bool is_endgame_ = false; @@ -1171,12 +1140,14 @@ private: since the blocklist has changed, erase that cached value */ for (auto* const tor : torrents_) { - for (auto& pool : { std::ref(tor->swarm->connectable_pool), std::ref(tor->swarm->incoming_pool) }) + for (auto const& [socket_address, peer_info] : tor->swarm->connectable_pool) { - for (auto& [socket_address, atom] : pool.get()) - { - atom.set_blocklisted_dirty(); - } + peer_info->set_blocklisted_dirty(); + } + + for (auto* const peer : tor->swarm->peers) + { + peer->peer_info->set_blocklisted_dirty(); } } } @@ -1291,22 +1262,26 @@ namespace { namespace handshake_helpers { -void create_bit_torrent_peer(tr_torrent& tor, std::shared_ptr io, tr_peer_info* peer_info, tr_interned_string client) +void create_bit_torrent_peer( + tr_torrent& tor, + std::shared_ptr io, + std::shared_ptr peer_info, + tr_interned_string client) { - TR_ASSERT(peer_info != nullptr); + TR_ASSERT(peer_info); TR_ASSERT(tor.swarm != nullptr); tr_swarm* swarm = tor.swarm; - auto* peer = tr_peerMsgs::create(tor, peer_info, std::move(io), client, &tr_swarm::peer_callback_bt, swarm); - - swarm->peers.push_back(peer); + auto* const + msgs = tr_peerMsgs::create(tor, std::move(peer_info), std::move(io), client, &tr_swarm::peer_callback_bt, swarm); + swarm->peers.push_back(msgs); ++swarm->stats.peer_count; - ++swarm->stats.peer_from_count[peer_info->from_first()]; + ++swarm->stats.peer_from_count[msgs->peer_info->from_first()]; TR_ASSERT(swarm->stats.peer_count == swarm->peerCount()); - TR_ASSERT(swarm->stats.peer_from_count[peer_info->from_first()] <= swarm->stats.peer_count); + TR_ASSERT(swarm->stats.peer_from_count[msgs->peer_info->from_first()] <= swarm->stats.peer_count); } /* FIXME: this is kind of a mess. */ @@ -1317,20 +1292,20 @@ void create_bit_torrent_peer(tr_torrent& tor, std::shared_ptr io, tr_ TR_ASSERT(result.io != nullptr); auto const& socket_address = result.io->socket_address(); auto* const swarm = manager->get_existing_swarm(result.io->torrent_hash()); - auto* info = swarm != nullptr ? swarm->get_existing_peer_info(socket_address) : nullptr; + auto info = swarm != nullptr ? swarm->get_existing_peer_info(socket_address) : std::shared_ptr{}; if (result.io->is_incoming()) { manager->incoming_handshakes.erase(socket_address); } - else if (info != nullptr) + else if (info) { info->destroy_handshake(); } if (!result.is_connected || swarm == nullptr || !swarm->is_running) { - if (info != nullptr && !info->is_connected()) + if (info && !info->is_connected()) { info->on_connection_failed(); @@ -1351,10 +1326,10 @@ void create_bit_torrent_peer(tr_torrent& tor, std::shared_ptr io, tr_ if (result.io->is_incoming()) { - info = &swarm->ensure_info_exists(socket_address, 0U, TR_PEER_FROM_INCOMING, false); + info = std::make_shared(socket_address.address(), 0U, TR_PEER_FROM_INCOMING); } - if (info == nullptr) + if (!info) { return false; } @@ -1395,7 +1370,7 @@ void create_bit_torrent_peer(tr_torrent& tor, std::shared_ptr io, tr_ } result.io->set_bandwidth(&swarm->tor->bandwidth()); - create_bit_torrent_peer(*swarm->tor, result.io, info, client); + create_bit_torrent_peer(*swarm->tor, result.io, std::move(info), client); return true; } @@ -1444,7 +1419,7 @@ size_t tr_peerMgrAddPex(tr_torrent* tor, tr_peer_from from, tr_pex const* pex, s { // we store this peer since it is supposedly connectable (socket address should be the peer's listening address) // don't care about non-connectable peers that we are not connected to - s->ensure_info_exists(pex->socket_address, pex->flags, from, true); + s->ensure_info_exists(pex->socket_address, pex->flags, from); ++n_used; } } @@ -1556,7 +1531,7 @@ std::vector tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty { if (peer->socket_address().address().type == address_type) { - infos.emplace_back(peer->peer_info); + infos.emplace_back(peer->peer_info.get()); } } } @@ -1566,10 +1541,10 @@ std::vector tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty infos.reserve(std::size(pool)); for (auto const& [socket_address, peer_info] : pool) { - TR_ASSERT(socket_address == peer_info.listen_socket_address()); - if (socket_address.address().type == address_type && is_peer_interesting(tor, peer_info)) + TR_ASSERT(socket_address == peer_info->listen_socket_address()); + if (socket_address.address().type == address_type && is_peer_interesting(tor, *peer_info)) { - infos.emplace_back(&peer_info); + infos.emplace_back(peer_info.get()); } } } @@ -2151,7 +2126,7 @@ auto constexpr MaxUploadIdleSecs = time_t{ 60 * 5 }; } auto const* tor = s->tor; - auto const* const info = peer->peer_info; + auto const& info = peer->peer_info; /* disconnect if we're both seeds and enough time has passed for PEX */ if (tor->is_done() && peer->is_seed()) @@ -2195,7 +2170,7 @@ void close_peer(tr_peerMsgs* peer) constexpr struct { - [[nodiscard]] constexpr static int compare(tr_peerMsgs const* a, tr_peerMsgs const* b) // <=> + [[nodiscard]] static int compare(tr_peerMsgs const* a, tr_peerMsgs const* b) // <=> { if (a->do_purge != b->do_purge) { @@ -2205,7 +2180,7 @@ constexpr struct return -a->peer_info->compare_by_piece_data_time(*b->peer_info); } - [[nodiscard]] constexpr bool operator()(tr_peerMsgs const* a, tr_peerMsgs const* b) const // less than + [[nodiscard]] bool operator()(tr_peerMsgs const* a, tr_peerMsgs const* b) const // less than { return compare(a, b) < 0; } @@ -2419,22 +2394,6 @@ namespace connect_helpers return true; } -struct peer_candidate -{ - peer_candidate() = default; - - peer_candidate(uint64_t score_in, tr_torrent const* const tor_in, tr_peer_info const* const peer_info_in) - : score{ score_in } - , tor{ tor_in } - , peer_info{ peer_info_in } - { - } - - uint64_t score; - tr_torrent const* tor; - tr_peer_info const* peer_info; -}; - [[nodiscard]] constexpr uint64_t addValToKey(uint64_t value, unsigned int width, uint64_t addme) { value <<= width; @@ -2506,6 +2465,22 @@ struct peer_candidate void get_peer_candidates(size_t global_peer_limit, tr_torrents& torrents, tr_peerMgr::OutboundCandidates& setme) { + struct peer_candidate + { + peer_candidate() = default; + + peer_candidate(uint64_t score_in, tr_torrent const* const tor_in, tr_peer_info const* const peer_info_in) + : score{ score_in } + , tor{ tor_in } + , peer_info{ peer_info_in } + { + } + + uint64_t score; + tr_torrent const* tor; + tr_peer_info const* peer_info; + }; + setme.clear(); auto const now = tr_time(); @@ -2551,24 +2526,24 @@ void get_peer_candidates(size_t global_peer_limit, tr_torrents& torrents, tr_pee continue; } - for (auto const& [socket_address, atom] : swarm->connectable_pool) + for (auto const& [socket_address, peer_info] : swarm->connectable_pool) { - if (is_peer_candidate(tor, atom, now)) + if (is_peer_candidate(tor, *peer_info, now)) { - candidates.emplace_back(getPeerCandidateScore(tor, atom, salter()), tor, &atom); + candidates.emplace_back(getPeerCandidateScore(tor, *peer_info, salter()), tor, peer_info.get()); } } } // only keep the best `max` candidates - if (auto const max = tr_peerMgr::OutboundCandidates::requested_inline_size; max < std::size(candidates)) + if (static auto constexpr Max = tr_peerMgr::OutboundCandidates::requested_inline_size; Max < std::size(candidates)) { std::partial_sort( std::begin(candidates), - std::begin(candidates) + max, + std::begin(candidates) + Max, std::end(candidates), [](auto const& a, auto const& b) { return a.score < b.score; }); - candidates.resize(max); + candidates.resize(Max); } // put the best candidates at the end of the list @@ -2638,14 +2613,13 @@ void tr_peerMgr::make_new_peer_connections() // initiate connections to the last N candidates auto const n_this_pass = std::min(std::size(candidates), MaxConnectionsPerPulse); - auto const it_end = std::crbegin(candidates) + n_this_pass; - for (auto it = std::crbegin(candidates); it != it_end; ++it) + for (auto it = std::crbegin(candidates), end = std::crbegin(candidates) + n_this_pass; it != end; ++it) { auto const& [tor_id, sock_addr] = *it; if (auto* const tor = torrents_.get(tor_id); tor != nullptr) { - if (auto* const peer_info = tor->swarm->get_existing_peer_info(sock_addr); peer_info != nullptr) + if (auto const& peer_info = tor->swarm->get_existing_peer_info(sock_addr)) { initiate_connection(this, tor->swarm, *peer_info); } @@ -2660,7 +2634,7 @@ void HandshakeMediator::set_utp_failed(tr_sha1_digest_t const& info_hash, tr_soc { if (auto* const tor = torrents_.get(info_hash); tor != nullptr) { - if (auto* const peer_info = tor->swarm->get_existing_peer_info(socket_address); peer_info != nullptr) + if (auto const& peer_info = tor->swarm->get_existing_peer_info(socket_address)) { peer_info->set_utp_supported(false); } diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index b521404c9..ce9d66f03 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -305,12 +305,12 @@ class tr_peerMsgsImpl final : public tr_peerMsgs public: tr_peerMsgsImpl( tr_torrent& torrent_in, - tr_peer_info* const peer_info_in, + std::shared_ptr peer_info_in, std::shared_ptr io_in, tr_interned_string client, tr_peer_callback_bt callback, void* callback_data) - : tr_peerMsgs{ torrent_in, peer_info_in, client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() } + : tr_peerMsgs{ torrent_in, std::move(peer_info_in), client, io_in->is_encrypted(), io_in->is_incoming(), io_in->is_utp() } , tor_{ torrent_in } , io_{ std::move(io_in) } , have_{ torrent_in.piece_count() } @@ -2054,13 +2054,13 @@ size_t tr_peerMsgsImpl::max_available_reqs() const tr_peerMsgs::tr_peerMsgs( tr_torrent const& tor, - tr_peer_info* peer_info_in, + std::shared_ptr peer_info_in, tr_interned_string user_agent, bool connection_is_encrypted, bool connection_is_incoming, bool connection_is_utp) : tr_peer{ tor } - , peer_info{ peer_info_in } + , peer_info{ std::move(peer_info_in) } , user_agent_{ user_agent } , connection_is_encrypted_{ connection_is_encrypted } , connection_is_incoming_{ connection_is_incoming } @@ -2079,11 +2079,11 @@ tr_peerMsgs::~tr_peerMsgs() tr_peerMsgs* tr_peerMsgs::create( tr_torrent& torrent, - tr_peer_info* const peer_info, + std::shared_ptr peer_info, std::shared_ptr io, tr_interned_string user_agent, tr_peer_callback_bt callback, void* callback_data) { - return new tr_peerMsgsImpl{ torrent, peer_info, std::move(io), user_agent, callback, callback_data }; + return new tr_peerMsgsImpl{ torrent, std::move(peer_info), std::move(io), user_agent, callback, callback_data }; } diff --git a/libtransmission/peer-msgs.h b/libtransmission/peer-msgs.h index 2017a9f89..783473df0 100644 --- a/libtransmission/peer-msgs.h +++ b/libtransmission/peer-msgs.h @@ -37,7 +37,7 @@ class tr_peerMsgs : public tr_peer public: tr_peerMsgs( tr_torrent const& tor, - tr_peer_info* peer_info_in, + std::shared_ptr peer_info_in, tr_interned_string user_agent, bool connection_is_encrypted, bool connection_is_incoming, @@ -108,7 +108,7 @@ public: static tr_peerMsgs* create( tr_torrent& torrent, - tr_peer_info* peer_info, + std::shared_ptr peer_info, std::shared_ptr io, tr_interned_string user_agent, tr_peer_callback_bt callback, @@ -146,8 +146,7 @@ protected: } public: - // TODO(tearfur): change this to reference - tr_peer_info* const peer_info; + std::shared_ptr const peer_info; private: static inline auto n_peers = std::atomic{};