refactor: directly check if announcers are idle instead of using a stop counter (#6237)

This commit is contained in:
Yat Ho 2023-11-13 11:43:43 +08:00 committed by GitHub
parent 8ebb5b0bc3
commit 90e91d6284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 39 additions and 52 deletions

View File

@ -354,7 +354,7 @@ struct tau_tracker
}
// are there any requests pending?
if (this->isIdle())
if (this->is_idle())
{
return;
}
@ -402,6 +402,11 @@ struct tau_tracker
}
}
[[nodiscard]] bool is_idle() const noexcept
{
return std::empty(announces) && std::empty(scrapes) && !addr_pending_dns_;
}
private:
using Sockaddr = std::pair<sockaddr_storage, socklen_t>;
using MaybeSockaddr = std::optional<Sockaddr>;
@ -444,11 +449,6 @@ private:
return std::make_pair(ss, len);
}
[[nodiscard]] bool isIdle() const noexcept
{
return std::empty(announces) && std::empty(scrapes) && !addr_pending_dns_;
}
void failAll(bool did_connect, bool did_timeout, std::string_view errmsg)
{
for (auto& req : this->scrapes)
@ -690,6 +690,11 @@ public:
return false;
}
[[nodiscard]] bool is_idle() const noexcept override
{
return std::all_of(std::begin(trackers_), std::end(trackers_), [](auto const& tracker) { return tracker.is_idle(); });
}
private:
// Finds the tau_tracker struct that corresponds to this url.
// If it doesn't exist yet, create one.

View File

@ -6,7 +6,7 @@
#include <algorithm>
#include <array>
#include <chrono> // operator""ms
#include <cstdio>
#include <cstddef> // size_t
#include <ctime>
#include <deque>
#include <iterator>
@ -115,11 +115,10 @@ struct tr_scrape_info
class tr_announcer_impl final : public tr_announcer
{
public:
explicit tr_announcer_impl(tr_session* session_in, tr_announcer_udp& announcer_udp, std::atomic<size_t>& n_pending_stops)
explicit tr_announcer_impl(tr_session* session_in, tr_announcer_udp& announcer_udp)
: session{ session_in }
, announcer_udp_{ announcer_udp }
, upkeep_timer_{ session_in->timerMaker().create() }
, n_pending_stops_{ n_pending_stops }
{
upkeep_timer_->set_callback([this]() { this->upkeep(); });
upkeep_timer_->start_repeating(UpkeepInterval);
@ -147,7 +146,7 @@ public:
flushCloseMessages();
}
void upkeep();
void upkeep() override;
void onAnnounceDone(int tier_id, tr_announce_event event, bool is_running_on_success, tr_announce_response const& response);
void onScrapeDone(tr_scrape_response const& response);
@ -184,22 +183,7 @@ public:
void announce(tr_announce_request const& request, tr_announce_response_func on_response)
{
auto const is_stop = request.event == TR_ANNOUNCE_EVENT_STOPPED;
TR_ASSERT(!is_shutting_down_ || is_stop);
if (is_stop)
{
++n_pending_stops_;
on_response =
[&n_stops = n_pending_stops_, on_response = std::move(on_response)](tr_announce_response const& response)
{
TR_ASSERT(n_stops > 0U);
--n_stops;
on_response(response);
};
}
TR_ASSERT(!is_shutting_down_ || request.event == TR_ANNOUNCE_EVENT_STOPPED);
if (auto const announce_sv = request.announce_url.sv();
tr_strv_starts_with(announce_sv, "http://"sv) || tr_strv_starts_with(announce_sv, "https://"sv))
@ -239,19 +223,13 @@ private:
std::set<tr_announce_request, StopsCompare> stops_;
std::atomic<size_t>& n_pending_stops_;
bool is_shutting_down_ = false;
};
std::unique_ptr<tr_announcer> tr_announcer::create(
tr_session* session,
tr_announcer_udp& announcer_udp,
std::atomic<size_t>& n_pending_stops)
std::unique_ptr<tr_announcer> tr_announcer::create(tr_session* session, tr_announcer_udp& announcer_udp)
{
TR_ASSERT(session != nullptr);
return std::make_unique<tr_announcer_impl>(session, announcer_udp, n_pending_stops);
return std::make_unique<tr_announcer_impl>(session, announcer_udp);
}
// ---

View File

@ -9,7 +9,6 @@
#error only libtransmission should #include this header.
#endif
#include <atomic>
#include <cstddef> // size_t
#include <cstdint> // uint32_t
#include <ctime>
@ -70,10 +69,7 @@ using tr_tracker_callback = std::function<void(tr_torrent&, tr_tracker_event con
class tr_announcer
{
public:
[[nodiscard]] static std::unique_ptr<tr_announcer> create(
tr_session* session,
tr_announcer_udp&,
std::atomic<size_t>& n_pending_stops);
[[nodiscard]] static std::unique_ptr<tr_announcer> create(tr_session* session, tr_announcer_udp&);
virtual ~tr_announcer() = default;
virtual tr_torrent_announcer* addTorrent(tr_torrent*, tr_tracker_callback callback) = 0;
@ -82,6 +78,8 @@ public:
virtual void resetTorrent(tr_torrent* tor) = 0;
virtual void removeTorrent(tr_torrent* tor) = 0;
virtual void startShutdown() = 0;
virtual void upkeep() = 0;
};
std::unique_ptr<tr_announcer> tr_announcerCreate(tr_session* session);
@ -159,4 +157,6 @@ public:
// @brief process an incoming udp message if it's a tracker response.
// @return true if msg was a tracker response; false otherwise
virtual bool handle_message(uint8_t const* msg, size_t msglen) = 0;
[[nodiscard]] virtual bool is_idle() const noexcept = 0;
};

View File

@ -1378,13 +1378,14 @@ void tr_session::closeImplPart1(std::promise<void>* closed_promise, std::chrono:
void tr_session::closeImplPart2(std::promise<void>* closed_promise, std::chrono::time_point<std::chrono::steady_clock> deadline)
{
// try to keep the UDP announcer alive long enough to send out
// try to keep web_ and the UDP announcer alive long enough to send out
// all the &event=stopped tracker announces.
// also wait for all ip cache updates to finish so that web_ can
// safely destruct.
if ((n_pending_stops_ != 0U || !global_ip_cache_->try_shutdown()) && std::chrono::steady_clock::now() < deadline)
if ((!web_->is_idle() || !announcer_udp_->is_idle() || !global_ip_cache_->try_shutdown()) &&
std::chrono::steady_clock::now() < deadline)
{
announcer_udp_->upkeep();
announcer_->upkeep();
return;
}

View File

@ -1097,10 +1097,6 @@ private:
/// fields that aren't trivial,
/// but are self-contained / don't hold references to others
// used during shutdown:
// how many &event=stopped announces are still being sent to trackers
std::atomic<size_t> n_pending_stops_ = {};
mutable std::recursive_mutex session_mutex_;
tr_stats session_stats_{ config_dir_, time(nullptr) };
@ -1183,7 +1179,7 @@ public:
std::unique_ptr<tr_announcer_udp> announcer_udp_ = tr_announcer_udp::create(announcer_udp_mediator_);
// depends-on: settings_, torrents_, web_, announcer_udp_
std::unique_ptr<tr_announcer> announcer_ = tr_announcer::create(this, *announcer_udp_, n_pending_stops_);
std::unique_ptr<tr_announcer> announcer_ = tr_announcer::create(this, *announcer_udp_);
// depends-on: public_peer_port_, udp_core_, dht_mediator_
std::unique_ptr<tr_dht> dht_;

View File

@ -227,6 +227,11 @@ public:
queued_tasks_cv_.notify_one();
}
[[nodiscard]] bool is_idle() const noexcept
{
return std::empty(queued_tasks_) && std::empty(running_tasks_);
}
class Task
{
public:
@ -621,11 +626,6 @@ public:
}
}
[[nodiscard]] bool is_idle() const noexcept
{
return std::empty(queued_tasks_) && std::empty(running_tasks_);
}
void remove_task(Task const& task)
{
auto const lock = std::unique_lock{ tasks_mutex_ };
@ -809,3 +809,8 @@ void tr_web::startShutdown(std::chrono::milliseconds deadline)
{
impl_->startShutdown(deadline);
}
bool tr_web::is_idle() const noexcept
{
return impl_->is_idle();
}

View File

@ -100,6 +100,8 @@ public:
// are left alone so that they can finish.
void startShutdown(std::chrono::milliseconds /*deadline*/);
[[nodiscard]] bool is_idle() const noexcept;
// If you want to give running tasks a chance to finish, call closeSoon()
// before destroying the tr_web object. Deleting the object will cancel
// all of its tasks.