refactor: reduce coupling between tr_session and tr_peerMgr (#6151)

This commit is contained in:
Charles Kerr 2023-10-23 11:34:30 -04:00 committed by GitHub
parent fab21fc9c2
commit c0e5e3a368
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 156 additions and 150 deletions

View File

@ -659,7 +659,7 @@ private:
void on_torrent_doomed()
{
auto const lock = tor->unique_lock();
auto const lock = unique_lock();
stop();
tor->swarm = nullptr;
delete this;
@ -672,7 +672,7 @@ private:
void on_swarm_is_all_seeds()
{
auto const lock = tor->unique_lock();
auto const lock = unique_lock();
for (auto& [socket_address, atom] : connectable_pool)
{
@ -1481,7 +1481,7 @@ std::vector<tr_pex> tr_peerMgrGetPeers(tr_torrent const* tor, uint8_t address_ty
void tr_swarm::on_torrent_started()
{
auto const lock = tor->unique_lock();
auto const lock = unique_lock();
is_running = true;
manager->rechokeSoon();
}
@ -2141,28 +2141,28 @@ void enforceSwarmPeerLimit(tr_swarm* swarm, size_t max)
std::for_each(std::begin(peers), std::end(peers), close_peer);
}
void enforceSessionPeerLimit(tr_session* session)
void enforceSessionPeerLimit(size_t global_peer_limit, tr_torrents& torrents)
{
// No need to disconnect if we are under the peer limit
auto const max = session->peerLimit();
if (tr_peerMsgs::size() <= max)
// if we're under the limit, then no action needed
auto const current_size = tr_peerMsgs::size();
if (current_size <= global_peer_limit)
{
return;
}
// Make a list of all the peers.
// make a list of all the peers
auto peers = std::vector<tr_peerMsgs*>{};
peers.reserve(tr_peerMsgs::size());
for (auto const* const tor : session->torrents())
peers.reserve(current_size);
for (auto const* const tor : torrents)
{
peers.insert(std::end(peers), std::begin(tor->swarm->peers), std::end(tor->swarm->peers));
}
TR_ASSERT(tr_peerMsgs::size() == std::size(peers));
if (std::size(peers) > max)
TR_ASSERT(current_size == std::size(peers));
if (std::size(peers) > global_peer_limit)
{
std::partial_sort(std::begin(peers), std::begin(peers) + max, std::end(peers), ComparePeerByMostActive);
std::for_each(std::begin(peers) + max, std::end(peers), close_peer);
std::partial_sort(std::begin(peers), std::begin(peers) + global_peer_limit, std::end(peers), ComparePeerByMostActive);
std::for_each(std::begin(peers) + global_peer_limit, std::end(peers), close_peer);
}
}
} // namespace disconnect_helpers
@ -2177,7 +2177,8 @@ void tr_peerMgr::reconnectPulse()
// remove crappy peers
auto bad_peers_buf = bad_peers_t{};
for (auto* const tor : session->torrents())
auto& torrents = session->torrents();
for (auto* const tor : torrents)
{
auto* const swarm = tor->swarm;
@ -2192,7 +2193,7 @@ void tr_peerMgr::reconnectPulse()
}
// if we're over the per-torrent peer limits, cull some peers
for (auto* const tor : session->torrents())
for (auto* const tor : torrents)
{
if (tor->is_running())
{
@ -2201,7 +2202,7 @@ void tr_peerMgr::reconnectPulse()
}
// if we're over the per-session peer limits, cull some peers
enforceSessionPeerLimit(session);
enforceSessionPeerLimit(session->peerLimit(), torrents);
// try to make new peer connections
make_new_peer_connections();
@ -2213,7 +2214,6 @@ namespace
{
namespace bandwidth_helpers
{
void pumpAllPeers(tr_peerMgr* mgr)
{
for (auto* const tor : mgr->session->torrents())
@ -2224,25 +2224,6 @@ void pumpAllPeers(tr_peerMgr* mgr)
}
}
}
void queuePulse(tr_session* session, tr_direction dir)
{
TR_ASSERT(session != nullptr);
TR_ASSERT(tr_isDirection(dir));
if (!session->queueEnabled(dir))
{
return;
}
auto const n = session->countQueueFreeSlots(dir);
for (auto* tor : session->getNextQueuedTorrents(dir, n))
{
tr_torrentStartNow(tor);
session->onQueuedTorrentStarted(tor);
}
}
} // namespace bandwidth_helpers
} // namespace
@ -2265,10 +2246,6 @@ void tr_peerMgr::bandwidthPulse()
tr_torrentMagnetDoIdleWork(tor);
}
/* pump the queues */
queuePulse(session, TR_UP);
queuePulse(session, TR_DOWN);
reconnectPulse();
}
@ -2406,7 +2383,7 @@ struct peer_candidate
return score;
}
void get_peer_candidates(tr_session* session, tr_peerMgr::OutboundCandidates& setme)
void get_peer_candidates(size_t global_peer_limit, tr_torrents& torrents, tr_peerMgr::OutboundCandidates& setme)
{
setme.clear();
@ -2414,7 +2391,7 @@ void get_peer_candidates(tr_session* session, tr_peerMgr::OutboundCandidates& se
auto const now_msec = tr_time_msec();
// leave 5% of connection slots for incoming connections -- ticket #2609
if (auto const max_candidates = static_cast<size_t>(session->peerLimit() * 0.95); max_candidates <= tr_peerMsgs::size())
if (auto const max_candidates = static_cast<size_t>(global_peer_limit * 0.95); max_candidates <= tr_peerMsgs::size())
{
return;
}
@ -2424,7 +2401,7 @@ void get_peer_candidates(tr_session* session, tr_peerMgr::OutboundCandidates& se
/* populate the candidate array */
auto salter = tr_salt_shaker{};
for (auto* const tor : session->torrents())
for (auto* const tor : torrents)
{
auto* const swarm = tor->swarm;
@ -2535,7 +2512,7 @@ void tr_peerMgr::make_new_peer_connections()
auto& candidates = outbound_candidates_;
if (std::empty(candidates))
{
get_peer_candidates(session, candidates);
get_peer_candidates(session->peerLimit(), session->torrents(), candidates);
}
// initiate connections to the last N candidates

View File

@ -576,7 +576,7 @@ tr_session* tr_sessionInit(char const* config_dir, bool message_queueing_enabled
return session;
}
void tr_session::onNowTimer()
void tr_session::on_now_timer()
{
TR_ASSERT(now_timer_);
auto const now = std::chrono::system_clock::now();
@ -595,6 +595,121 @@ void tr_session::onNowTimer()
now_timer_->set_interval(std::chrono::duration_cast<std::chrono::milliseconds>(target_interval));
}
namespace
{
namespace queue_helpers
{
std::vector<tr_torrent*> get_next_queued_torrents(tr_torrents& torrents, tr_direction dir, size_t num_wanted)
{
TR_ASSERT(tr_isDirection(dir));
// build an array of the candidates
auto candidates = std::vector<tr_torrent*>{};
candidates.reserve(std::size(torrents));
for (auto* const tor : torrents)
{
if (tor->is_queued() && (dir == tor->queue_direction()))
{
candidates.push_back(tor);
}
}
// find the best n candidates
num_wanted = std::min(num_wanted, std::size(candidates));
if (num_wanted < candidates.size())
{
std::partial_sort(
std::begin(candidates),
std::begin(candidates) + num_wanted,
std::end(candidates),
[](auto const* a, auto const* b) { return tr_torrentGetQueuePosition(a) < tr_torrentGetQueuePosition(b); });
candidates.resize(num_wanted);
}
return candidates;
}
} // namespace queue_helpers
} // namespace
size_t tr_session::count_queue_free_slots(tr_direction dir) const noexcept
{
if (!queueEnabled(dir))
{
return std::numeric_limits<size_t>::max();
}
auto const max = queueSize(dir);
auto const activity = dir == TR_UP ? TR_STATUS_SEED : TR_STATUS_DOWNLOAD;
// count how many torrents are active
auto active_count = size_t{};
auto const stalled_enabled = queueStalledEnabled();
auto const stalled_if_idle_for_n_seconds = queueStalledMinutes() * 60;
auto const now = tr_time();
for (auto const* const tor : torrents())
{
/* is it the right activity? */
if (activity != tor->activity())
{
continue;
}
/* is it stalled? */
if (stalled_enabled && difftime(now, std::max(tor->startDate, tor->activityDate)) >= stalled_if_idle_for_n_seconds)
{
continue;
}
++active_count;
/* if we've reached the limit, no need to keep counting */
if (active_count >= max)
{
return 0;
}
}
return max - active_count;
}
void tr_session::on_queue_timer()
{
using namespace queue_helpers;
for (auto const dir : { TR_UP, TR_DOWN })
{
if (!queueEnabled(dir))
{
continue;
}
auto const n_wanted = count_queue_free_slots(dir);
for (auto* tor : get_next_queued_torrents(torrents(), dir, n_wanted))
{
tr_torrentStartNow(tor);
if (queue_start_callback_ != nullptr)
{
queue_start_callback_(this, tor, queue_start_user_data_);
}
}
}
}
// Periodically save the .resume files of any torrents whose
// status has recently changed. This prevents loss of metadata
// in the case of a crash, unclean shutdown, clumsy user, etc.
void tr_session::on_save_timer()
{
for (auto* const tor : torrents())
{
tr_torrentSave(tor);
}
stats().save();
}
void tr_session::initImpl(init_data& data)
{
auto lock = unique_lock();
@ -1217,6 +1332,7 @@ void tr_session::closeImplPart1(std::promise<void>* closed_promise, std::chrono:
utp_timer.reset();
verifier_.reset();
save_timer_.reset();
queue_timer_.reset();
now_timer_.reset();
rpc_server_.reset();
dht_.reset();
@ -1931,79 +2047,6 @@ bool tr_sessionGetAntiBruteForceEnabled(tr_session const* session)
// ---
std::vector<tr_torrent*> tr_session::getNextQueuedTorrents(tr_direction dir, size_t num_wanted) const
{
TR_ASSERT(tr_isDirection(dir));
// build an array of the candidates
auto candidates = std::vector<tr_torrent*>{};
candidates.reserve(std::size(torrents()));
for (auto* const tor : torrents())
{
if (tor->is_queued() && (dir == tor->queue_direction()))
{
candidates.push_back(tor);
}
}
// find the best n candidates
num_wanted = std::min(num_wanted, std::size(candidates));
if (num_wanted < candidates.size())
{
std::partial_sort(
std::begin(candidates),
std::begin(candidates) + num_wanted,
std::end(candidates),
[](auto const* a, auto const* b) { return tr_torrentGetQueuePosition(a) < tr_torrentGetQueuePosition(b); });
candidates.resize(num_wanted);
}
return candidates;
}
size_t tr_session::countQueueFreeSlots(tr_direction dir) const noexcept
{
if (!queueEnabled(dir))
{
return std::numeric_limits<size_t>::max();
}
auto const max = queueSize(dir);
auto const activity = dir == TR_UP ? TR_STATUS_SEED : TR_STATUS_DOWNLOAD;
/* count how many torrents are active */
auto active_count = size_t{};
bool const stalled_enabled = queueStalledEnabled();
auto const stalled_if_idle_for_n_seconds = queueStalledMinutes() * 60;
time_t const now = tr_time();
for (auto const* const tor : torrents())
{
/* is it the right activity? */
if (activity != tor->activity())
{
continue;
}
/* is it stalled? */
if (stalled_enabled && difftime(now, std::max(tor->startDate, tor->activityDate)) >= stalled_if_idle_for_n_seconds)
{
continue;
}
++active_count;
/* if we've reached the limit, no need to keep counting */
if (active_count >= max)
{
return 0;
}
}
return max - active_count;
}
// ---
void tr_session::verify_remove(tr_torrent const* const tor)
{
if (verifier_)
@ -2076,9 +2119,12 @@ void tr_sessionClearStats(tr_session* session)
session->stats().clear();
}
// ---
namespace
{
auto constexpr SaveIntervalSecs = 360s;
auto constexpr QueueInterval = 1s;
auto constexpr SaveInterval = 360s;
auto makeResumeDir(std::string_view config_dir)
{
@ -2108,7 +2154,6 @@ auto makeBlocklistDir(std::string_view config_dir)
tr_sys_dir_create(dir.c_str(), TR_SYS_DIR_CREATE_PARENTS, 0777);
return dir;
}
} // namespace
tr_session::tr_session(std::string_view config_dir, tr_variant const& settings_dict)
@ -2122,24 +2167,13 @@ tr_session::tr_session(std::string_view config_dir, tr_variant const& settings_d
, session_id_{ tr_time }
, peer_mgr_{ tr_peerMgrNew(this), &tr_peerMgrFree }
, rpc_server_{ std::make_unique<tr_rpc_server>(this, settings_dict) }
, now_timer_{ timer_maker_->create([this]() { on_now_timer(); }) }
, queue_timer_{ timer_maker_->create([this]() { on_queue_timer(); }) }
, save_timer_{ timer_maker_->create([this]() { on_save_timer(); }) }
{
now_timer_ = timerMaker().create([this]() { onNowTimer(); });
now_timer_->start_repeating(1s);
// Periodically save the .resume files of any torrents whose
// status has recently changed. This prevents loss of metadata
// in the case of a crash, unclean shutdown, clumsy user, etc.
save_timer_ = timerMaker().create(
[this]()
{
for (auto* const tor : torrents())
{
tr_torrentSave(tor);
}
stats().save();
});
save_timer_->start_repeating(SaveIntervalSecs);
queue_timer_->start_repeating(QueueInterval);
save_timer_->start_repeating(SaveInterval);
}
void tr_session::addIncoming(tr_peer_socket&& socket)

View File

@ -586,14 +586,6 @@ public:
queue_start_user_data_ = user_data;
}
void onQueuedTorrentStarted(tr_torrent* tor)
{
if (queue_start_callback_ != nullptr)
{
queue_start_callback_(this, tor, queue_start_user_data_);
}
}
constexpr void setIdleLimitHitCallback(tr_session_idle_limit_hit_func cb, void* user_data)
{
idle_limit_hit_callback_ = cb;
@ -820,9 +812,7 @@ public:
return TR_RPC_OK;
}
[[nodiscard]] size_t countQueueFreeSlots(tr_direction dir) const noexcept;
[[nodiscard]] std::vector<tr_torrent*> getNextQueuedTorrents(tr_direction dir, size_t num_wanted) const;
[[nodiscard]] size_t count_queue_free_slots(tr_direction dir) const noexcept;
[[nodiscard]] bool addressIsBlocked(tr_address const& addr) const noexcept;
@ -959,7 +949,9 @@ private:
void closeImplPart1(std::promise<void>* closed_promise, std::chrono::time_point<std::chrono::steady_clock> deadline);
void closeImplPart2(std::promise<void>* closed_promise, std::chrono::time_point<std::chrono::steady_clock> deadline);
void onNowTimer();
void on_now_timer();
void on_queue_timer();
void on_save_timer();
static void onIncomingPeerConnection(tr_socket_t fd, void* vsession);
@ -1200,6 +1192,9 @@ private:
// depends-on: alt_speeds_, udp_core_, torrents_
std::unique_ptr<libtransmission::Timer> now_timer_;
// depends-on: torrents_
std::unique_ptr<libtransmission::Timer> queue_timer_;
// depends-on: torrents_
std::unique_ptr<libtransmission::Timer> save_timer_;

View File

@ -644,7 +644,7 @@ bool torrentShouldQueue(tr_torrent const* const tor)
{
tr_direction const dir = tor->queue_direction();
return tor->session->countQueueFreeSlots(dir) == 0;
return tor->session->count_queue_free_slots(dir) == 0;
}
void torrentResetTransferStats(tr_torrent* tor)