refactor: add tr_announcer.startShutdown() (#4280)

This commit is contained in:
Charles Kerr 2022-11-29 21:05:11 -06:00 committed by GitHub
parent eeaefca6f3
commit 595d0ac14a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 55 additions and 60 deletions

View File

@ -32,7 +32,6 @@
#include "log.h"
#include "peer-io.h"
#include "peer-mgr.h" // for tr_pex::fromCompact4()
#include "session.h"
#include "tr-assert.h"
#include "tr-buffer.h"
#include "utils.h"
@ -300,11 +299,6 @@ struct tau_tracker
{
}
[[nodiscard]] auto isIdle() const noexcept
{
return std::empty(announces) && std::empty(scrapes) && !addr_pending_dns_;
}
void sendto(std::byte const* buf, size_t buflen)
{
TR_ASSERT(addr_);
@ -341,7 +335,6 @@ struct tau_tracker
void upkeep(bool timeout_reqs = true)
{
time_t const now = tr_time();
bool const closing = this->close_at != 0;
// do we have a DNS request that's ready?
if (addr_pending_dns_ && addr_pending_dns_->wait_for(0ms) == std::future_status::ready)
@ -358,7 +351,7 @@ struct tau_tracker
}
// update the addr if our lookup is past its shelf date
if (!closing && !addr_pending_dns_ && addr_expires_at_ <= now)
if (!addr_pending_dns_ && addr_expires_at_ <= now)
{
addr_.reset();
addr_pending_dns_ = std::async(std::launch::async, lookup, this->host, this->port, this->key);
@ -443,6 +436,11 @@ private:
return std::make_pair(ss, len);
}
[[nodiscard]] bool isIdle() const noexcept
{
return std::empty(announces) && std::empty(scrapes) && !addr_pending_dns_;
}
void failAll(bool did_connect, bool did_timeout, std::string_view errmsg)
{
for (auto& req : this->scrapes)
@ -463,24 +461,22 @@ private:
void timeout_requests(time_t now)
{
bool const cancel_all = this->close_at != 0 && (this->close_at <= now);
if (this->connecting_at != 0 && this->connecting_at + ConnectionRequestTtl < now)
{
auto empty_buf = libtransmission::Buffer{};
on_connection_response(TAU_ACTION_ERROR, empty_buf);
}
timeout_requests(this->announces, now, cancel_all, "announce");
timeout_requests(this->scrapes, now, cancel_all, "scrape");
timeout_requests(this->announces, now, "announce");
timeout_requests(this->scrapes, now, "scrape");
}
template<typename T>
void timeout_requests(std::list<T>& requests, time_t now, bool cancel_all, std::string_view name)
void timeout_requests(std::list<T>& requests, time_t now, std::string_view name)
{
for (auto it = std::begin(requests); it != std::end(requests);)
{
if (auto& req = *it; cancel_all || req.expiresAt() <= now)
if (auto& req = *it; req.expiresAt() <= now)
{
logtrace(this->key, fmt::format("timeout {} req {}", name, fmt::ptr(&req)));
req.fail(false, true, "");
@ -558,8 +554,6 @@ public:
tau_connection_t connection_id = {};
tau_transaction_t connection_transaction_id = {};
time_t close_at = 0;
std::list<tau_announce_request> announces;
std::list<tau_scrape_request> scrapes;
@ -622,25 +616,6 @@ public:
}
}
[[nodiscard]] bool isIdle() const noexcept override
{
return std::all_of(std::begin(trackers_), std::end(trackers_), [](auto const& tracker) { return tracker.isIdle(); });
}
// Start shutting down.
// This doesn't destroy everything if there are requests,
// but sets a deadline on how much longer to wait for the remaining ones.
void startShutdown() override
{
auto const now = time(nullptr);
for (auto& tracker : trackers_)
{
tracker.close_at = now + 3;
tracker.upkeep();
}
}
// @brief process an incoming udp message if it's a tracker response.
// @return true if msg was a tracker response; false otherwise
bool handleMessage(uint8_t const* msg, size_t msglen) override

View File

@ -160,6 +160,17 @@ public:
void resetTorrent(tr_torrent* tor) override;
void removeTorrent(tr_torrent* tor) override;
void startShutdown() override
{
is_shutting_down_ = true;
flushCloseMessages();
}
[[nodiscard]] size_t pendingAnnounces() const override
{
return pending_announces_;
}
void upkeep();
void onAnnounceDone(int tier_id, tr_announce_event event, bool is_running_on_success, tr_announce_response const& response);
@ -178,6 +189,8 @@ public:
void scrape(tr_scrape_request const& request, tr_scrape_response_func on_response)
{
TR_ASSERT(!is_shutting_down_);
if (auto const scrape_sv = request.scrape_url.sv();
tr_strvStartsWith(scrape_sv, "http://"sv) || tr_strvStartsWith(scrape_sv, "https://"sv))
{
@ -195,14 +208,25 @@ public:
void announce(tr_announce_request const& request, tr_announce_response_func on_response)
{
TR_ASSERT(!is_shutting_down_ || request.event == TR_ANNOUNCE_EVENT_STOPPED);
auto callback = [this, on_response = std::move(on_response)](tr_announce_response const& response)
{
TR_ASSERT(pending_announces_ > 0U);
--pending_announces_;
on_response(response);
};
if (auto const announce_sv = request.announce_url.sv();
tr_strvStartsWith(announce_sv, "http://"sv) || tr_strvStartsWith(announce_sv, "https://"sv))
{
tr_tracker_http_announce(session, request, std::move(on_response));
++pending_announces_;
tr_tracker_http_announce(session, request, std::move(callback));
}
else if (tr_strvStartsWith(announce_sv, "udp://"sv))
{
announcer_udp_.announce(request, std::move(on_response));
++pending_announces_;
announcer_udp_.announce(request, std::move(callback));
}
else
{
@ -225,6 +249,8 @@ private:
stops_.clear();
}
static auto constexpr UpkeepInterval = 500ms;
tr_announcer_udp& announcer_udp_;
std::map<tr_interned_string, tr_scrape_info> scrape_info_;
@ -233,7 +259,9 @@ private:
std::set<tr_announce_request, StopsCompare> stops_;
static auto constexpr UpkeepInterval = 500ms;
size_t pending_announces_ = {};
bool is_shutting_down_ = false;
};
std::unique_ptr<tr_announcer> tr_announcer::create(tr_session* session, tr_announcer_udp& announcer_udp)
@ -1546,7 +1574,7 @@ void tr_announcer_impl::upkeep()
flushCloseMessages();
// maybe kick off some scrapes / announces whose time has come
if (!session->isClosing())
if (!is_shutting_down_)
{
scrapeAndAnnounceMore(this);
}

View File

@ -71,6 +71,9 @@ public:
virtual void stopTorrent(tr_torrent* tor) = 0;
virtual void resetTorrent(tr_torrent* tor) = 0;
virtual void removeTorrent(tr_torrent* tor) = 0;
virtual void startShutdown() = 0;
[[nodiscard]] virtual size_t pendingAnnounces() const = 0;
};
std::unique_ptr<tr_announcer> tr_announcerCreate(tr_session* session);
@ -141,16 +144,12 @@ public:
[[nodiscard]] static std::unique_ptr<tr_announcer_udp> create(Mediator&);
[[nodiscard]] virtual bool isIdle() const noexcept = 0;
virtual void announce(tr_announce_request const& request, tr_announce_response_func on_response) = 0;
virtual void scrape(tr_scrape_request const& request, tr_scrape_response_func on_response) = 0;
virtual void upkeep() = 0;
virtual void startShutdown() = 0;
// @brief process an incoming udp message if it's a tracker response.
// @return true if msg was a tracker response; false otherwise
virtual bool handleMessage(uint8_t const* msg, size_t msglen) = 0;

View File

@ -1224,9 +1224,6 @@ void tr_session::closeImplPart1(std::promise<void>* closed_promise)
bound_ipv6_.reset();
bound_ipv4_.reset();
// tell other items to start shutting down
announcer_udp_->startShutdown();
// Close the torrents in order of most active to least active
// so that the most important announce=stopped events are
// fired out first...
@ -1245,14 +1242,14 @@ void tr_session::closeImplPart1(std::promise<void>* closed_promise)
tr_torrentFreeInSessionThread(tor);
}
torrents.clear();
// ...and now that all the torrents have been closed, any
// remaining `event=stopped` announce messages are queued in
// the announcer. The announcer's destructor sends all those
// out via `web_`...
this->announcer_.reset();
// ...and now that those are queued, tell web_ that we're
// shutting down soon. This leaves the `event=stopped` messages
// in the queue but refuses to take any _new_ tasks
// ...now that all the torrents have been closed, any remaining
// `&event=stopped` announce messages are queued in the announcer.
// Tell the announcer to start shutdown, which sends out the stop
// events and stops scraping.
this->announcer_->startShutdown();
// ...and now that those are queued, tell web_ that we're shutting
// down soon. This leaves the `event=stopped` going but refuses any
// new tasks.
this->web_->startShutdown();
this->cache.reset();
@ -1266,7 +1263,7 @@ void tr_session::closeImplPart2(std::promise<void>* closed_promise)
{
// try to keep the UDP announcer alive long enough to send out
// all the &event=stopped tracker announces
if (announcer_udp_ && !announcer_udp_->isIdle())
if (announcer_->pendingAnnounces() != 0U)
{
announcer_udp_->upkeep();
return;
@ -1274,6 +1271,7 @@ void tr_session::closeImplPart2(std::promise<void>* closed_promise)
save_timer_.reset();
this->announcer_.reset();
this->announcer_udp_.reset();
this->udp_core_.reset();

View File

@ -310,22 +310,18 @@ TEST_F(AnnouncerUdpTest, canScrape)
auto [request, expected_response] = buildSimpleScrapeRequestAndResponse();
auto response = std::optional<tr_scrape_response>{};
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
EXPECT_FALSE(announcer->isIdle());
// The announcer should have sent a UDP connection request.
// Inspect that request for validity.
auto sent = waitForAnnouncerToSendMessage(mediator);
EXPECT_FALSE(announcer->isIdle());
auto connect_transaction_id = parseConnectionRequest(sent);
// Have the tracker respond to the request
auto const connection_id = sendConnectionResponse(*announcer, connect_transaction_id);
EXPECT_FALSE(announcer->isIdle());
// The announcer should have sent a UDP scrape request.
// Inspect that request for validity.
sent = waitForAnnouncerToSendMessage(mediator);
EXPECT_FALSE(announcer->isIdle());
auto [scrape_transaction_id, info_hashes] = parseScrapeRequest(sent, connection_id);
expectEqual(request, info_hashes);
@ -340,7 +336,6 @@ TEST_F(AnnouncerUdpTest, canScrape)
auto arr = std::array<uint8_t, 256>{};
buf.toBuf(std::data(arr), response_size);
EXPECT_TRUE(announcer->handleMessage(std::data(arr), response_size));
EXPECT_TRUE(announcer->isIdle());
// confirm that announcer processed the response
EXPECT_TRUE(response);