Fix/long shutdown crash on shutdown (#4285)

This commit is contained in:
Charles Kerr 2022-11-30 13:00:34 -06:00 committed by GitHub
parent a0b5623769
commit f03fc9270b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 296 additions and 145 deletions

View File

@ -135,10 +135,11 @@ struct tr_scrape_info
class tr_announcer_impl final : public tr_announcer
{
public:
explicit tr_announcer_impl(tr_session* session_in, tr_announcer_udp& announcer_udp)
explicit tr_announcer_impl(tr_session* session_in, tr_announcer_udp& announcer_udp, std::atomic<size_t>& n_pending_stops)
: session{ session_in }
, announcer_udp_{ announcer_udp }
, upkeep_timer_{ session_in->timerMaker().create() }
, n_pending_stops_{ n_pending_stops }
{
upkeep_timer_->setCallback([this]() { this->upkeep(); });
upkeep_timer_->startRepeating(UpkeepInterval);
@ -166,11 +167,6 @@ public:
flushCloseMessages();
}
[[nodiscard]] size_t pendingAnnounces() const override
{
return pending_announces_;
}
void upkeep();
void onAnnounceDone(int tier_id, tr_announce_event event, bool is_running_on_success, tr_announce_response const& response);
@ -208,25 +204,31 @@ public:
void announce(tr_announce_request const& request, tr_announce_response_func on_response)
{
TR_ASSERT(!is_shutting_down_ || request.event == TR_ANNOUNCE_EVENT_STOPPED);
auto const is_stop = request.event == TR_ANNOUNCE_EVENT_STOPPED;
TR_ASSERT(!is_shutting_down_ || is_stop);
auto callback = [this, on_response = std::move(on_response)](tr_announce_response const& response)
if (is_stop)
{
TR_ASSERT(pending_announces_ > 0U);
--pending_announces_;
on_response(response);
};
++n_pending_stops_;
on_response =
[&n_stops = n_pending_stops_, on_response = std::move(on_response)](tr_announce_response const& response)
{
TR_ASSERT(n_stops > 0U);
--n_stops;
on_response(response);
};
}
if (auto const announce_sv = request.announce_url.sv();
tr_strvStartsWith(announce_sv, "http://"sv) || tr_strvStartsWith(announce_sv, "https://"sv))
{
++pending_announces_;
tr_tracker_http_announce(session, request, std::move(callback));
tr_tracker_http_announce(session, request, std::move(on_response));
}
else if (tr_strvStartsWith(announce_sv, "udp://"sv))
{
++pending_announces_;
announcer_udp_.announce(request, std::move(callback));
announcer_udp_.announce(request, std::move(on_response));
}
else
{
@ -243,7 +245,7 @@ private:
{
for (auto& stop : stops_)
{
announce(stop, {});
announce(stop, [](tr_announce_response const&) {});
}
stops_.clear();
@ -259,16 +261,19 @@ private:
std::set<tr_announce_request, StopsCompare> stops_;
size_t pending_announces_ = {};
std::atomic<size_t>& n_pending_stops_;
bool is_shutting_down_ = false;
};
std::unique_ptr<tr_announcer> tr_announcer::create(tr_session* session, tr_announcer_udp& announcer_udp)
std::unique_ptr<tr_announcer> tr_announcer::create(
tr_session* session,
tr_announcer_udp& announcer_udp,
std::atomic<size_t>& n_pending_stops)
{
TR_ASSERT(session != nullptr);
return std::make_unique<tr_announcer_impl>(session, announcer_udp);
return std::make_unique<tr_announcer_impl>(session, announcer_udp, n_pending_stops);
}
/***
@ -955,7 +960,7 @@ void tr_announcer_impl::removeTorrent(tr_torrent* tor)
for (auto const& tier : ta->tiers)
{
if (tier.isRunning)
if (tier.isRunning && tier.lastAnnounceSucceeded)
{
stops_.emplace(create_announce_request(this, tor, &tier, TR_ANNOUNCE_EVENT_STOPPED));
}

View File

@ -9,6 +9,7 @@
#error only libtransmission should #include this header.
#endif
#include <atomic>
#include <cstddef> // size_t
#include <cstdint> // uint32_t
#include <ctime>
@ -63,7 +64,10 @@ using tr_tracker_callback = void (*)(tr_torrent* tor, tr_tracker_event const* ev
class tr_announcer
{
public:
[[nodiscard]] static std::unique_ptr<tr_announcer> create(tr_session* session, tr_announcer_udp&);
[[nodiscard]] static std::unique_ptr<tr_announcer> create(
tr_session* session,
tr_announcer_udp&,
std::atomic<size_t>& n_pending_stops);
virtual ~tr_announcer() = default;
virtual tr_torrent_announcer* addTorrent(tr_torrent*, tr_tracker_callback callback, void* callback_data) = 0;
@ -71,9 +75,7 @@ public:
virtual void stopTorrent(tr_torrent* tor) = 0;
virtual void resetTorrent(tr_torrent* tor) = 0;
virtual void removeTorrent(tr_torrent* tor) = 0;
virtual void startShutdown() = 0;
[[nodiscard]] virtual size_t pendingAnnounces() const = 0;
};
std::unique_ptr<tr_announcer> tr_announcerCreate(tr_session* session);

View File

@ -347,9 +347,8 @@ struct tr_peer_socket tr_netOpenPeerUTPSocket(
if (session->utp_context != nullptr && tr_address_is_valid_for_peers(addr, port))
{
auto const [ss, sslen] = addr->toSockaddr(port);
auto* const socket = utp_create_socket(session->utp_context);
if (socket != nullptr)
if (auto* const socket = utp_create_socket(session->utp_context); socket != nullptr)
{
if (utp_connect(socket, reinterpret_cast<sockaddr const*>(&ss), sslen) != -1)
{

View File

@ -1208,7 +1208,7 @@ double tr_sessionGetRawSpeed_KBps(tr_session const* session, tr_direction dir)
return tr_toSpeedKBps(tr_sessionGetRawSpeed_Bps(session, dir));
}
void tr_session::closeImplPart1(std::promise<void>* closed_promise)
void tr_session::closeImplPart1(std::promise<void>* closed_promise, std::chrono::time_point<std::chrono::steady_clock> deadline)
{
is_closing_ = true;
@ -1250,20 +1250,20 @@ void tr_session::closeImplPart1(std::promise<void>* closed_promise)
// ...and now that those are queued, tell web_ that we're shutting
// down soon. This leaves the `event=stopped` going but refuses any
// new tasks.
this->web_->startShutdown();
this->web_->startShutdown(10s);
this->cache.reset();
// recycle the now-unused save_timer_ here to wait for UDP shutdown
TR_ASSERT(!save_timer_);
save_timer_ = timerMaker().create([this, closed_promise]() { closeImplPart2(closed_promise); });
save_timer_ = timerMaker().create([this, closed_promise, deadline]() { closeImplPart2(closed_promise, deadline); });
save_timer_->startRepeating(50ms);
}
void tr_session::closeImplPart2(std::promise<void>* closed_promise)
void tr_session::closeImplPart2(std::promise<void>* closed_promise, std::chrono::time_point<std::chrono::steady_clock> deadline)
{
// try to keep the UDP announcer alive long enough to send out
// all the &event=stopped tracker announces
if (announcer_->pendingAnnounces() != 0U)
if (n_pending_stops_ != 0U && std::chrono::steady_clock::now() < deadline)
{
announcer_udp_->upkeep();
return;
@ -1284,7 +1284,7 @@ void tr_session::closeImplPart2(std::promise<void>* closed_promise)
closed_promise->set_value();
}
void tr_sessionClose(tr_session* session)
void tr_sessionClose(tr_session* session, size_t timeout_secs)
{
TR_ASSERT(session != nullptr);
TR_ASSERT(!session->amInSessionThread());
@ -1293,8 +1293,9 @@ void tr_sessionClose(tr_session* session)
auto closed_promise = std::promise<void>{};
auto closed_future = closed_promise.get_future();
session->runInSessionThread([session, &closed_promise]() { session->closeImplPart1(&closed_promise); });
closed_future.wait_for(12s);
auto const deadline = std::chrono::steady_clock::now() + std::chrono::seconds{ timeout_secs };
session->runInSessionThread([&closed_promise, deadline, session]() { session->closeImplPart1(&closed_promise, deadline); });
closed_future.wait();
delete session;
}

View File

@ -933,8 +933,8 @@ private:
void setSettings(tr_variant* settings_dict, bool force);
void setSettings(tr_session_settings&& settings, bool force);
void closeImplPart1(std::promise<void>* closed_promise);
void closeImplPart2(std::promise<void>* closed_promise);
void closeImplPart1(std::promise<void>* closed_promise, std::chrono::time_point<std::chrono::steady_clock> deadline);
void closeImplPart2(std::promise<void>* closed_promise, std::chrono::time_point<std::chrono::steady_clock> deadline);
void onNowTimer();
@ -967,7 +967,7 @@ private:
friend uint16_t tr_sessionGetPeerPort(tr_session const* session);
friend uint16_t tr_sessionGetRPCPort(tr_session const* session);
friend uint16_t tr_sessionSetPeerPortRandom(tr_session* session);
friend void tr_sessionClose(tr_session* session);
friend void tr_sessionClose(tr_session* session, size_t timeout_secs);
friend void tr_sessionGetSettings(tr_session const* s, tr_variant* setme_dictionary);
friend void tr_sessionLimitSpeed(tr_session* session, tr_direction dir, bool limited);
friend void tr_sessionReloadBlocklists(tr_session* session);
@ -1080,6 +1080,10 @@ private:
/// fields that aren't trivial,
/// but are self-contained / don't hold references to others
// used during shutdown:
// how many &event=stopped announces are still being sent to trackers
std::atomic<size_t> n_pending_stops_ = {};
mutable std::recursive_mutex session_mutex_;
tr_stats session_stats_{ config_dir_, time(nullptr) };
@ -1152,7 +1156,7 @@ public:
std::unique_ptr<tr_announcer_udp> announcer_udp_ = tr_announcer_udp::create(announcer_udp_mediator_);
// depends-on: settings_, torrents_, web_, announcer_udp_
std::unique_ptr<tr_announcer> announcer_ = tr_announcer::create(this, *announcer_udp_);
std::unique_ptr<tr_announcer> announcer_ = tr_announcer::create(this, *announcer_udp_, n_pending_stops_);
// depends-on: public_peer_port_, udp_core_, dht_mediator_
std::unique_ptr<tr_dht> dht_;

View File

@ -234,9 +234,15 @@ void tr_sessionSet(tr_session* session, struct tr_variant* settings);
reload whatever blocklist files are found there */
void tr_sessionReloadBlocklists(tr_session* session);
/** @brief End a libtransmission session
@see tr_sessionInit() */
void tr_sessionClose(tr_session*);
/**
* @brief End a libtransmission session.
* @see tr_sessionInit()
*
* This may take some time while &event=stopped announces are sent to trackers.
*
* @param `timeout_secs` specifies how long to wait on these announces.
*/
void tr_sessionClose(tr_session*, size_t timeout_secs = 15);
/**
* @brief Return the session's configuration directory.

View File

@ -4,13 +4,14 @@
// License text can be found in the licenses/ folder.
#include <algorithm>
#include <atomic>
#include <array>
#include <atomic>
#include <condition_variable>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <stack>
#include <string>
#include <thread>
#include <utility>
@ -32,6 +33,7 @@
#include "utils-ev.h"
#include "utils.h"
#include "web.h"
#include "web-utils.h"
using namespace std::literals;
@ -43,6 +45,56 @@ using namespace std::literals;
****
***/
namespace curl_helpers
{
struct ShareDeleter
{
void operator()(CURLSH* shared)
{
if (shared == nullptr)
{
return;
}
[[maybe_unused]] auto const status = curl_share_cleanup(shared);
TR_ASSERT(status == CURLSHE_OK);
}
};
using shared_unique_ptr = std::unique_ptr<CURLSH, ShareDeleter>;
struct MultiDeleter
{
void operator()(CURLM* multi)
{
if (multi == nullptr)
{
return;
}
[[maybe_unused]] auto const status = curl_multi_cleanup(multi);
TR_ASSERT(status == CURLM_OK);
}
};
using multi_unique_ptr = std::unique_ptr<CURLM, MultiDeleter>;
struct EasyDeleter
{
void operator()(CURL* val) const noexcept
{
if (val != nullptr)
{
curl_easy_cleanup(val);
}
}
};
using easy_unique_ptr = std::unique_ptr<CURL, EasyDeleter>;
} // namespace curl_helpers
#ifdef _WIN32
static CURLcode ssl_context_func(CURL* /*curl*/, void* ssl_ctx, void* /*user_data*/)
{
@ -136,8 +188,8 @@ public:
this->user_agent = *ua;
}
auto const lock = std::unique_lock(queued_tasks_mutex);
curl_thread = std::make_unique<std::thread>(curlThreadFunc, this);
auto const lock = std::unique_lock{ tasks_mutex_ };
curl_thread = std::make_unique<std::thread>(&Impl::curlThreadFunc, this);
}
Impl(Impl&&) = delete;
@ -147,52 +199,55 @@ public:
~Impl()
{
run_mode = RunMode::CloseNow;
queued_tasks_cv.notify_one();
deadline_ = tr_time();
queued_tasks_cv_.notify_one();
curl_thread->join();
}
void startShutdown()
void startShutdown(std::chrono::milliseconds deadline)
{
run_mode = RunMode::CloseSoon;
queued_tasks_cv.notify_one();
}
[[nodiscard]] bool isClosed() const noexcept
{
return is_closed_;
deadline_ = tr_time() + std::chrono::duration_cast<std::chrono::seconds>(deadline).count();
queued_tasks_cv_.notify_one();
}
void fetch(FetchOptions&& options)
{
if (run_mode != RunMode::Run)
if (deadline_exists())
{
return;
}
auto const lock = std::unique_lock(queued_tasks_mutex);
queued_tasks.emplace_back(new Task{ *this, std::move(options) });
queued_tasks_cv.notify_one();
auto const lock = std::unique_lock{ tasks_mutex_ };
queued_tasks_.emplace_back(*this, std::move(options));
queued_tasks_cv_.notify_one();
}
class Task
{
private:
libtransmission::evhelpers::evbuffer_unique_ptr privbuf{ evbuffer_new() };
std::unique_ptr<CURL, void (*)(CURL*)> const easy_handle{ curl_easy_init(), curl_easy_cleanup };
tr_web::FetchOptions options;
public:
Task(tr_web::Impl& impl_in, tr_web::FetchOptions&& options_in)
: options{ std::move(options_in) }
, impl{ impl_in }
: impl{ impl_in }
, options{ std::move(options_in) }
, easy_{ impl.get_easy(tr_urlParse(options.url)->host) }
{
response.user_data = options.done_func_user_data;
}
// Some of the curl_easy_setopt() args took a pointer to this task.
// Disable moving so that we don't accidentally invalidate those pointers.
Task(Task&&) = delete;
Task(Task const&) = delete;
Task& operator=(Task&&) = delete;
Task& operator=(Task const&) = delete;
~Task()
{
easy_dispose(easy_);
}
[[nodiscard]] auto* easy() const
{
return easy_handle.get();
return easy_;
}
[[nodiscard]] auto* body() const
@ -269,17 +324,48 @@ public:
void done()
{
if (options.done_func == nullptr)
if (!options.done_func)
{
return;
}
response.body.assign(reinterpret_cast<char const*>(evbuffer_pullup(body(), -1)), evbuffer_get_length(body()));
impl.mediator.run(std::move(options.done_func), std::move(this->response));
options.done_func = {};
}
[[nodiscard]] bool operator==(Task const& that) const noexcept
{
return easy() == that.easy();
}
tr_web::Impl& impl;
tr_web::FetchResponse response;
private:
void easy_dispose(CURL* easy)
{
if (easy == nullptr)
{
return;
}
if (auto const url = tr_urlParse(options.url); url)
{
curl_easy_reset(easy);
impl.easy_pool_[std::string{ url->host }].emplace(easy);
}
else
{
curl_easy_cleanup(easy);
}
}
libtransmission::evhelpers::evbuffer_unique_ptr privbuf{ evbuffer_new() };
tr_web::FetchOptions options;
CURL* const easy_;
};
static auto constexpr BandwidthPauseMsec = long{ 500 };
@ -299,14 +385,43 @@ public:
std::unique_ptr<std::thread> curl_thread;
enum class RunMode
{
Run,
CloseSoon, // no new tasks; exit when running tasks finish
CloseNow // exit now even if tasks are running
};
// if unset: steady-state, all is good
// if set: do not accept new tasks
// if set and deadline reached: kill all remaining tasks
std::atomic<time_t> deadline_ = {};
std::atomic<RunMode> run_mode = RunMode::Run;
[[nodiscard]] auto deadline() const
{
return deadline_.load();
}
[[nodiscard]] bool deadline_exists() const
{
return deadline() != time_t{};
}
[[nodiscard]] bool deadline_reached() const
{
return deadline_exists() && deadline() <= tr_time();
}
[[nodiscard]] CURL* get_easy(std::string_view host)
{
CURL* easy = nullptr;
if (auto iter = easy_pool_.find(host); iter != std::end(easy_pool_) && !std::empty(iter->second))
{
easy = iter->second.top().release();
iter->second.pop();
}
if (easy == nullptr)
{
easy = curl_easy_init();
}
return easy;
}
static size_t onDataReceived(void* data, size_t size, size_t nmemb, void* vtask)
{
@ -356,34 +471,34 @@ public:
}
#endif
static void initEasy(tr_web::Impl* impl, Task* task)
void initEasy(Task& task)
{
TR_ASSERT(std::this_thread::get_id() == impl->curl_thread->get_id());
auto* const e = task->easy();
TR_ASSERT(std::this_thread::get_id() == curl_thread->get_id());
auto* const e = task.easy();
(void)curl_easy_setopt(e, CURLOPT_SHARE, impl->shared());
(void)curl_easy_setopt(e, CURLOPT_SHARE, shared());
(void)curl_easy_setopt(e, CURLOPT_DNS_CACHE_TIMEOUT, DnsCacheTimeoutSecs);
(void)curl_easy_setopt(e, CURLOPT_AUTOREFERER, 1L);
(void)curl_easy_setopt(e, CURLOPT_ENCODING, "");
(void)curl_easy_setopt(e, CURLOPT_FOLLOWLOCATION, 1L);
(void)curl_easy_setopt(e, CURLOPT_MAXREDIRS, -1L);
(void)curl_easy_setopt(e, CURLOPT_NOSIGNAL, 1L);
(void)curl_easy_setopt(e, CURLOPT_PRIVATE, task);
(void)curl_easy_setopt(e, CURLOPT_IPRESOLVE, task->ipProtocol());
(void)curl_easy_setopt(e, CURLOPT_PRIVATE, &task);
(void)curl_easy_setopt(e, CURLOPT_IPRESOLVE, task.ipProtocol());
#ifdef USE_LIBCURL_SOCKOPT
(void)curl_easy_setopt(e, CURLOPT_SOCKOPTFUNCTION, onSocketCreated);
(void)curl_easy_setopt(e, CURLOPT_SOCKOPTDATA, task);
(void)curl_easy_setopt(e, CURLOPT_SOCKOPTDATA, &task);
#endif
if (!impl->curl_ssl_verify)
if (!curl_ssl_verify)
{
(void)curl_easy_setopt(e, CURLOPT_SSL_VERIFYHOST, 0L);
(void)curl_easy_setopt(e, CURLOPT_SSL_VERIFYPEER, 0L);
}
else if (!std::empty(impl->curl_ca_bundle))
else if (!std::empty(curl_ca_bundle))
{
(void)curl_easy_setopt(e, CURLOPT_CAINFO, impl->curl_ca_bundle.c_str());
(void)curl_easy_setopt(e, CURLOPT_CAINFO, curl_ca_bundle.c_str());
}
else
{
@ -392,46 +507,46 @@ public:
#endif
}
if (!impl->curl_proxy_ssl_verify)
if (!curl_proxy_ssl_verify)
{
(void)curl_easy_setopt(e, CURLOPT_CAINFO, NULL);
(void)curl_easy_setopt(e, CURLOPT_CAPATH, NULL);
(void)curl_easy_setopt(e, CURLOPT_PROXY_SSL_VERIFYHOST, 0L);
(void)curl_easy_setopt(e, CURLOPT_PROXY_SSL_VERIFYPEER, 0L);
}
else if (!std::empty(impl->curl_ca_bundle))
else if (!std::empty(curl_ca_bundle))
{
(void)curl_easy_setopt(e, CURLOPT_PROXY_CAINFO, impl->curl_ca_bundle.c_str());
(void)curl_easy_setopt(e, CURLOPT_PROXY_CAINFO, curl_ca_bundle.c_str());
}
if (auto const& ua = impl->user_agent; !std::empty(ua))
if (auto const& ua = user_agent; !std::empty(ua))
{
(void)curl_easy_setopt(e, CURLOPT_USERAGENT, ua.c_str());
}
(void)curl_easy_setopt(e, CURLOPT_TIMEOUT, static_cast<long>(task->timeoutSecs().count()));
(void)curl_easy_setopt(e, CURLOPT_URL, task->url().c_str());
(void)curl_easy_setopt(e, CURLOPT_VERBOSE, impl->curl_verbose ? 1L : 0L);
(void)curl_easy_setopt(e, CURLOPT_WRITEDATA, task);
(void)curl_easy_setopt(e, CURLOPT_TIMEOUT, static_cast<long>(task.timeoutSecs().count()));
(void)curl_easy_setopt(e, CURLOPT_URL, task.url().c_str());
(void)curl_easy_setopt(e, CURLOPT_VERBOSE, curl_verbose ? 1L : 0L);
(void)curl_easy_setopt(e, CURLOPT_WRITEDATA, &task);
(void)curl_easy_setopt(e, CURLOPT_WRITEFUNCTION, onDataReceived);
(void)curl_easy_setopt(e, CURLOPT_MAXREDIRS, MaxRedirects);
if (auto const addrstr = task->publicAddress(); addrstr)
if (auto const addrstr = task.publicAddress(); addrstr)
{
(void)curl_easy_setopt(e, CURLOPT_INTERFACE, addrstr->c_str());
}
if (auto const& cookies = task->cookies(); cookies)
if (auto const& cookies = task.cookies(); cookies)
{
(void)curl_easy_setopt(e, CURLOPT_COOKIE, cookies->c_str());
}
if (auto const& file = impl->cookie_file; !std::empty(file))
if (auto const& file = cookie_file; !std::empty(file))
{
(void)curl_easy_setopt(e, CURLOPT_COOKIEFILE, file.c_str());
}
if (auto const& range = task->range(); range)
if (auto const& range = task.range(); range)
{
/* don't bother asking the server to compress webseed fragments */
(void)curl_easy_setopt(e, CURLOPT_ENCODING, "identity");
@ -465,49 +580,82 @@ public:
}
}
// the thread started by Impl.curl_thread runs this function
static void curlThreadFunc(Impl* impl)
[[nodiscard]] bool is_idle() const noexcept
{
auto const multi = std::unique_ptr<CURLM, CURLMcode (*)(CURLM*)>(curl_multi_init(), curl_multi_cleanup);
return std::empty(queued_tasks_) && std::empty(running_tasks_);
}
void remove_task(Task& task)
{
auto const lock = std::unique_lock{ tasks_mutex_ };
auto const iter = std::find(std::begin(running_tasks_), std::end(running_tasks_), task);
TR_ASSERT(iter != std::end(running_tasks_));
if (iter == std::end(running_tasks_))
{
return;
}
iter->done();
running_tasks_.erase(iter);
}
void timeout_task(Task& task)
{
task.response.status = 408; // request timed out
task.response.did_timeout = true;
remove_task(task);
}
// the thread started by Impl.curl_thread runs this function
void curlThreadFunc()
{
auto const multi = curl_helpers::multi_unique_ptr{ curl_multi_init() };
auto running_tasks = int{ 0 };
auto repeats = unsigned{};
for (;;)
{
if (impl->run_mode == RunMode::CloseNow)
if (deadline_reached())
{
while (!std::empty(running_tasks_))
{
auto& task = running_tasks_.front();
curl_multi_remove_handle(multi.get(), task.easy());
timeout_task(task);
}
}
if (deadline_exists() && is_idle())
{
break;
}
if (impl->run_mode == RunMode::CloseSoon && std::empty(impl->queued_tasks) && running_tasks == 0)
if (auto lock = std::unique_lock{ tasks_mutex_ }; lock.owns_lock())
{
break;
}
{
auto lock = std::unique_lock(impl->queued_tasks_mutex);
// sleep until there's something to do
auto const has_work = [&running_tasks, impl]()
auto const stop_waiting = [this]()
{
return running_tasks > 0 || !std::empty(impl->queued_tasks) || impl->run_mode != RunMode::Run;
return !is_idle() || !deadline_exists();
};
if (!has_work())
if (!stop_waiting())
{
impl->queued_tasks_cv.wait(lock, has_work);
queued_tasks_cv_.wait(lock, stop_waiting);
}
// add queued tasks
for (auto* task : impl->queued_tasks)
if (!std::empty(queued_tasks_))
{
tr_logAddTrace(fmt::format("adding task to curl: '{}'", task->url()));
initEasy(impl, task);
curl_multi_add_handle(multi.get(), task->easy());
for (auto& task : queued_tasks_)
{
initEasy(task);
curl_multi_add_handle(multi.get(), task.easy());
}
running_tasks_.splice(std::end(running_tasks_), queued_tasks_);
}
impl->queued_tasks.clear();
}
impl->resumePausedTasks();
resumePausedTasks();
// Adapted from https://curl.se/libcurl/c/curl_multi_wait.html docs.
// 'numfds' being zero means either a timeout or no file descriptors to
@ -530,7 +678,8 @@ public:
}
// nonblocking update of the tasks
curl_multi_perform(multi.get(), &running_tasks);
auto n_running = int{};
curl_multi_perform(multi.get(), &n_running);
// process any tasks that just finished
CURLMsg* msg = nullptr;
@ -553,24 +702,20 @@ public:
task->response.did_timeout = task->response.status == 0 &&
std::chrono::duration<double>(total_time) >= task->timeoutSecs();
curl_multi_remove_handle(multi.get(), e);
task->done();
delete task;
remove_task(*task);
}
}
}
// Discard any queued tasks.
// This shouldn't happen, but do it just in case
std::for_each(std::begin(impl->queued_tasks), std::end(impl->queued_tasks), [](auto* task) { delete task; });
impl->is_closed_ = true;
}
std::unique_ptr<CURLSH, CURLSHcode (*)(CURLSH*)> const curlsh_{ curl_share_init(), curl_share_cleanup };
curl_helpers::shared_unique_ptr const curlsh_{ curl_share_init() };
std::mutex queued_tasks_mutex;
std::condition_variable queued_tasks_cv;
std::list<Task*> queued_tasks;
std::map<std::string /*host*/, std::stack<curl_helpers::easy_unique_ptr>, std::less<>> easy_pool_;
std::mutex tasks_mutex_;
std::condition_variable queued_tasks_cv_;
std::list<Task> queued_tasks_;
std::list<Task> running_tasks_;
CURLSH* shared()
{
@ -599,8 +744,6 @@ public:
static std::once_flag curl_init_flag;
std::atomic<bool> is_closed_ = false;
std::multimap<uint64_t /*tr_time_msec()*/, CURL*> paused_easy_handles;
static void curlInit()
@ -633,12 +776,7 @@ void tr_web::fetch(FetchOptions&& options)
impl_->fetch(std::move(options));
}
bool tr_web::isClosed() const noexcept
void tr_web::startShutdown(std::chrono::milliseconds deadline)
{
return impl_->isClosed();
}
void tr_web::startShutdown()
{
impl_->startShutdown();
impl_->startShutdown(deadline);
}

View File

@ -97,11 +97,7 @@ public:
// Notify tr_web that it's going to be destroyed soon.
// New fetch() tasks will be rejected, but already-running tasks
// are left alone so that they can finish.
void startShutdown();
// True when tr_web is ready to be destroyed.
// Will never be true until after closeSoon() is called.
[[nodiscard]] bool isClosed() const noexcept;
void startShutdown(std::chrono::milliseconds);
// If you want to give running tasks a chance to finish, call closeSoon()
// before destroying the tr_web object. Deleting the object will cancel