diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index b7e59c9f6..bf7c8fb1c 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -659,7 +659,7 @@ private: void on_torrent_doomed() { - auto const lock = tor->unique_lock(); + auto const lock = unique_lock(); stop(); tor->swarm = nullptr; delete this; @@ -672,7 +672,7 @@ private: void on_swarm_is_all_seeds() { - auto const lock = tor->unique_lock(); + auto const lock = unique_lock(); for (auto& [socket_address, atom] : connectable_pool) { @@ -1481,7 +1481,7 @@ std::vector tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty void tr_swarm::on_torrent_started() { - auto const lock = tor->unique_lock(); + auto const lock = unique_lock(); is_running = true; manager->rechokeSoon(); } @@ -2141,28 +2141,28 @@ void enforceSwarmPeerLimit(tr_swarm* swarm, size_t max) std::for_each(std::begin(peers), std::end(peers), close_peer); } -void enforceSessionPeerLimit(tr_session* session) +void enforceSessionPeerLimit(size_t global_peer_limit, tr_torrents& torrents) { - // No need to disconnect if we are under the peer limit - auto const max = session->peerLimit(); - if (tr_peerMsgs::size() <= max) + // if we're under the limit, then no action needed + auto const current_size = tr_peerMsgs::size(); + if (current_size <= global_peer_limit) { return; } - // Make a list of all the peers. + // make a list of all the peers auto peers = std::vector{}; - peers.reserve(tr_peerMsgs::size()); - for (auto const* const tor : session->torrents()) + peers.reserve(current_size); + for (auto const* const tor : torrents) { peers.insert(std::end(peers), std::begin(tor->swarm->peers), std::end(tor->swarm->peers)); } - TR_ASSERT(tr_peerMsgs::size() == std::size(peers)); - if (std::size(peers) > max) + TR_ASSERT(current_size == std::size(peers)); + if (std::size(peers) > global_peer_limit) { - std::partial_sort(std::begin(peers), std::begin(peers) + max, std::end(peers), ComparePeerByMostActive); - std::for_each(std::begin(peers) + max, std::end(peers), close_peer); + std::partial_sort(std::begin(peers), std::begin(peers) + global_peer_limit, std::end(peers), ComparePeerByMostActive); + std::for_each(std::begin(peers) + global_peer_limit, std::end(peers), close_peer); } } } // namespace disconnect_helpers @@ -2177,7 +2177,8 @@ void tr_peerMgr::reconnectPulse() // remove crappy peers auto bad_peers_buf = bad_peers_t{}; - for (auto* const tor : session->torrents()) + auto& torrents = session->torrents(); + for (auto* const tor : torrents) { auto* const swarm = tor->swarm; @@ -2192,7 +2193,7 @@ void tr_peerMgr::reconnectPulse() } // if we're over the per-torrent peer limits, cull some peers - for (auto* const tor : session->torrents()) + for (auto* const tor : torrents) { if (tor->is_running()) { @@ -2201,7 +2202,7 @@ void tr_peerMgr::reconnectPulse() } // if we're over the per-session peer limits, cull some peers - enforceSessionPeerLimit(session); + enforceSessionPeerLimit(session->peerLimit(), torrents); // try to make new peer connections make_new_peer_connections(); @@ -2213,7 +2214,6 @@ namespace { namespace bandwidth_helpers { - void pumpAllPeers(tr_peerMgr* mgr) { for (auto* const tor : mgr->session->torrents()) @@ -2224,25 +2224,6 @@ void pumpAllPeers(tr_peerMgr* mgr) } } } - -void queuePulse(tr_session* session, tr_direction dir) -{ - TR_ASSERT(session != nullptr); - TR_ASSERT(tr_isDirection(dir)); - - if (!session->queueEnabled(dir)) - { - return; - } - - auto const n = session->countQueueFreeSlots(dir); - for (auto* tor : session->getNextQueuedTorrents(dir, n)) - { - tr_torrentStartNow(tor); - session->onQueuedTorrentStarted(tor); - } -} - } // namespace bandwidth_helpers } // namespace @@ -2265,10 +2246,6 @@ void tr_peerMgr::bandwidthPulse() tr_torrentMagnetDoIdleWork(tor); } - /* pump the queues */ - queuePulse(session, TR_UP); - queuePulse(session, TR_DOWN); - reconnectPulse(); } @@ -2406,7 +2383,7 @@ struct peer_candidate return score; } -void get_peer_candidates(tr_session* session, tr_peerMgr::OutboundCandidates& setme) +void get_peer_candidates(size_t global_peer_limit, tr_torrents& torrents, tr_peerMgr::OutboundCandidates& setme) { setme.clear(); @@ -2414,7 +2391,7 @@ void get_peer_candidates(tr_session* session, tr_peerMgr::OutboundCandidates& se auto const now_msec = tr_time_msec(); // leave 5% of connection slots for incoming connections -- ticket #2609 - if (auto const max_candidates = static_cast(session->peerLimit() * 0.95); max_candidates <= tr_peerMsgs::size()) + if (auto const max_candidates = static_cast(global_peer_limit * 0.95); max_candidates <= tr_peerMsgs::size()) { return; } @@ -2424,7 +2401,7 @@ void get_peer_candidates(tr_session* session, tr_peerMgr::OutboundCandidates& se /* populate the candidate array */ auto salter = tr_salt_shaker{}; - for (auto* const tor : session->torrents()) + for (auto* const tor : torrents) { auto* const swarm = tor->swarm; @@ -2535,7 +2512,7 @@ void tr_peerMgr::make_new_peer_connections() auto& candidates = outbound_candidates_; if (std::empty(candidates)) { - get_peer_candidates(session, candidates); + get_peer_candidates(session->peerLimit(), session->torrents(), candidates); } // initiate connections to the last N candidates diff --git a/libtransmission/session.cc b/libtransmission/session.cc index e17308a62..963121195 100644 --- a/libtransmission/session.cc +++ b/libtransmission/session.cc @@ -576,7 +576,7 @@ tr_session* tr_sessionInit(char const* config_dir, bool message_queueing_enabled return session; } -void tr_session::onNowTimer() +void tr_session::on_now_timer() { TR_ASSERT(now_timer_); auto const now = std::chrono::system_clock::now(); @@ -595,6 +595,121 @@ void tr_session::onNowTimer() now_timer_->set_interval(std::chrono::duration_cast(target_interval)); } +namespace +{ +namespace queue_helpers +{ +std::vector get_next_queued_torrents(tr_torrents& torrents, tr_direction dir, size_t num_wanted) +{ + TR_ASSERT(tr_isDirection(dir)); + + // build an array of the candidates + auto candidates = std::vector{}; + candidates.reserve(std::size(torrents)); + for (auto* const tor : torrents) + { + if (tor->is_queued() && (dir == tor->queue_direction())) + { + candidates.push_back(tor); + } + } + + // find the best n candidates + num_wanted = std::min(num_wanted, std::size(candidates)); + if (num_wanted < candidates.size()) + { + std::partial_sort( + std::begin(candidates), + std::begin(candidates) + num_wanted, + std::end(candidates), + [](auto const* a, auto const* b) { return tr_torrentGetQueuePosition(a) < tr_torrentGetQueuePosition(b); }); + candidates.resize(num_wanted); + } + + return candidates; +} +} // namespace queue_helpers +} // namespace + +size_t tr_session::count_queue_free_slots(tr_direction dir) const noexcept +{ + if (!queueEnabled(dir)) + { + return std::numeric_limits::max(); + } + + auto const max = queueSize(dir); + auto const activity = dir == TR_UP ? TR_STATUS_SEED : TR_STATUS_DOWNLOAD; + + // count how many torrents are active + auto active_count = size_t{}; + auto const stalled_enabled = queueStalledEnabled(); + auto const stalled_if_idle_for_n_seconds = queueStalledMinutes() * 60; + auto const now = tr_time(); + for (auto const* const tor : torrents()) + { + /* is it the right activity? */ + if (activity != tor->activity()) + { + continue; + } + + /* is it stalled? */ + if (stalled_enabled && difftime(now, std::max(tor->startDate, tor->activityDate)) >= stalled_if_idle_for_n_seconds) + { + continue; + } + + ++active_count; + + /* if we've reached the limit, no need to keep counting */ + if (active_count >= max) + { + return 0; + } + } + + return max - active_count; +} + +void tr_session::on_queue_timer() +{ + using namespace queue_helpers; + + for (auto const dir : { TR_UP, TR_DOWN }) + { + if (!queueEnabled(dir)) + { + continue; + } + + auto const n_wanted = count_queue_free_slots(dir); + + for (auto* tor : get_next_queued_torrents(torrents(), dir, n_wanted)) + { + tr_torrentStartNow(tor); + + if (queue_start_callback_ != nullptr) + { + queue_start_callback_(this, tor, queue_start_user_data_); + } + } + } +} + +// Periodically save the .resume files of any torrents whose +// status has recently changed. This prevents loss of metadata +// in the case of a crash, unclean shutdown, clumsy user, etc. +void tr_session::on_save_timer() +{ + for (auto* const tor : torrents()) + { + tr_torrentSave(tor); + } + + stats().save(); +} + void tr_session::initImpl(init_data& data) { auto lock = unique_lock(); @@ -1217,6 +1332,7 @@ void tr_session::closeImplPart1(std::promise* closed_promise, std::chrono: utp_timer.reset(); verifier_.reset(); save_timer_.reset(); + queue_timer_.reset(); now_timer_.reset(); rpc_server_.reset(); dht_.reset(); @@ -1931,79 +2047,6 @@ bool tr_sessionGetAntiBruteForceEnabled(tr_session const* session) // --- -std::vector tr_session::getNextQueuedTorrents(tr_direction dir, size_t num_wanted) const -{ - TR_ASSERT(tr_isDirection(dir)); - - // build an array of the candidates - auto candidates = std::vector{}; - candidates.reserve(std::size(torrents())); - for (auto* const tor : torrents()) - { - if (tor->is_queued() && (dir == tor->queue_direction())) - { - candidates.push_back(tor); - } - } - - // find the best n candidates - num_wanted = std::min(num_wanted, std::size(candidates)); - if (num_wanted < candidates.size()) - { - std::partial_sort( - std::begin(candidates), - std::begin(candidates) + num_wanted, - std::end(candidates), - [](auto const* a, auto const* b) { return tr_torrentGetQueuePosition(a) < tr_torrentGetQueuePosition(b); }); - candidates.resize(num_wanted); - } - - return candidates; -} - -size_t tr_session::countQueueFreeSlots(tr_direction dir) const noexcept -{ - if (!queueEnabled(dir)) - { - return std::numeric_limits::max(); - } - - auto const max = queueSize(dir); - auto const activity = dir == TR_UP ? TR_STATUS_SEED : TR_STATUS_DOWNLOAD; - - /* count how many torrents are active */ - auto active_count = size_t{}; - bool const stalled_enabled = queueStalledEnabled(); - auto const stalled_if_idle_for_n_seconds = queueStalledMinutes() * 60; - time_t const now = tr_time(); - for (auto const* const tor : torrents()) - { - /* is it the right activity? */ - if (activity != tor->activity()) - { - continue; - } - - /* is it stalled? */ - if (stalled_enabled && difftime(now, std::max(tor->startDate, tor->activityDate)) >= stalled_if_idle_for_n_seconds) - { - continue; - } - - ++active_count; - - /* if we've reached the limit, no need to keep counting */ - if (active_count >= max) - { - return 0; - } - } - - return max - active_count; -} - -// --- - void tr_session::verify_remove(tr_torrent const* const tor) { if (verifier_) @@ -2076,9 +2119,12 @@ void tr_sessionClearStats(tr_session* session) session->stats().clear(); } +// --- + namespace { -auto constexpr SaveIntervalSecs = 360s; +auto constexpr QueueInterval = 1s; +auto constexpr SaveInterval = 360s; auto makeResumeDir(std::string_view config_dir) { @@ -2108,7 +2154,6 @@ auto makeBlocklistDir(std::string_view config_dir) tr_sys_dir_create(dir.c_str(), TR_SYS_DIR_CREATE_PARENTS, 0777); return dir; } - } // namespace tr_session::tr_session(std::string_view config_dir, tr_variant const& settings_dict) @@ -2122,24 +2167,13 @@ tr_session::tr_session(std::string_view config_dir, tr_variant const& settings_d , session_id_{ tr_time } , peer_mgr_{ tr_peerMgrNew(this), &tr_peerMgrFree } , rpc_server_{ std::make_unique(this, settings_dict) } + , now_timer_{ timer_maker_->create([this]() { on_now_timer(); }) } + , queue_timer_{ timer_maker_->create([this]() { on_queue_timer(); }) } + , save_timer_{ timer_maker_->create([this]() { on_save_timer(); }) } { - now_timer_ = timerMaker().create([this]() { onNowTimer(); }); now_timer_->start_repeating(1s); - - // Periodically save the .resume files of any torrents whose - // status has recently changed. This prevents loss of metadata - // in the case of a crash, unclean shutdown, clumsy user, etc. - save_timer_ = timerMaker().create( - [this]() - { - for (auto* const tor : torrents()) - { - tr_torrentSave(tor); - } - - stats().save(); - }); - save_timer_->start_repeating(SaveIntervalSecs); + queue_timer_->start_repeating(QueueInterval); + save_timer_->start_repeating(SaveInterval); } void tr_session::addIncoming(tr_peer_socket&& socket) diff --git a/libtransmission/session.h b/libtransmission/session.h index ac213ab5b..258be8e56 100644 --- a/libtransmission/session.h +++ b/libtransmission/session.h @@ -586,14 +586,6 @@ public: queue_start_user_data_ = user_data; } - void onQueuedTorrentStarted(tr_torrent* tor) - { - if (queue_start_callback_ != nullptr) - { - queue_start_callback_(this, tor, queue_start_user_data_); - } - } - constexpr void setIdleLimitHitCallback(tr_session_idle_limit_hit_func cb, void* user_data) { idle_limit_hit_callback_ = cb; @@ -820,9 +812,7 @@ public: return TR_RPC_OK; } - [[nodiscard]] size_t countQueueFreeSlots(tr_direction dir) const noexcept; - - [[nodiscard]] std::vector getNextQueuedTorrents(tr_direction dir, size_t num_wanted) const; + [[nodiscard]] size_t count_queue_free_slots(tr_direction dir) const noexcept; [[nodiscard]] bool addressIsBlocked(tr_address const& addr) const noexcept; @@ -959,7 +949,9 @@ private: void closeImplPart1(std::promise* closed_promise, std::chrono::time_point deadline); void closeImplPart2(std::promise* closed_promise, std::chrono::time_point deadline); - void onNowTimer(); + void on_now_timer(); + void on_queue_timer(); + void on_save_timer(); static void onIncomingPeerConnection(tr_socket_t fd, void* vsession); @@ -1200,6 +1192,9 @@ private: // depends-on: alt_speeds_, udp_core_, torrents_ std::unique_ptr now_timer_; + // depends-on: torrents_ + std::unique_ptr queue_timer_; + // depends-on: torrents_ std::unique_ptr save_timer_; diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc index d0464eb1c..6e03fb6fe 100644 --- a/libtransmission/torrent.cc +++ b/libtransmission/torrent.cc @@ -644,7 +644,7 @@ bool torrentShouldQueue(tr_torrent const* const tor) { tr_direction const dir = tor->queue_direction(); - return tor->session->countQueueFreeSlots(dir) == 0; + return tor->session->count_queue_free_slots(dir) == 0; } void torrentResetTransferStats(tr_torrent* tor)