refactor: tr_web (#2640)

* fixup! refactor: tr_web (#2633)

fix: race condition in web threadfunc during bootstrap

fixes #2639
This commit is contained in:
Charles Kerr 2022-02-17 17:35:57 -06:00 committed by GitHub
parent 33cb3b0a73
commit 29af76d977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 84 additions and 95 deletions

View File

@ -124,7 +124,7 @@ static char* tr_strlratio(char* buf, double ratio, size_t buflen)
static bool waitingOnWeb;
static void onTorrentFileDownloaded(tr_web::FetchResponse&& response)
static void onTorrentFileDownloaded(tr_web::FetchResponse const& response)
{
auto* ctor = static_cast<tr_ctor*>(response.user_data);
tr_ctorSetMetainfo(ctor, std::data(response.body), std::size(response.body), nullptr);

View File

@ -73,7 +73,7 @@ Glib::RefPtr<Gdk::Pixbuf> favicon_load_from_cache(std::string const& host)
}
}
void favicon_web_done_cb(tr_web::FetchResponse&& response);
void favicon_web_done_cb(tr_web::FetchResponse const& response);
bool favicon_web_done_idle_cb(std::unique_ptr<favicon_data> fav)
{
@ -102,7 +102,7 @@ bool favicon_web_done_idle_cb(std::unique_ptr<favicon_data> fav)
return false;
}
void favicon_web_done_cb(tr_web::FetchResponse&& response)
void favicon_web_done_cb(tr_web::FetchResponse const& response)
{
auto* const fav = static_cast<favicon_data*>(response.user_data);
fav->contents = response.body;

View File

@ -281,7 +281,7 @@ struct announce_data
char log_name[128];
};
static void onAnnounceDone(tr_web::FetchResponse&& web_response)
static void onAnnounceDone(tr_web::FetchResponse const& web_response)
{
auto const& [status, body, did_connect, did_timeout, vdata] = web_response;
auto* data = static_cast<struct announce_data*>(vdata);
@ -445,7 +445,7 @@ struct scrape_data
char log_name[128];
};
static void onScrapeDone(tr_web::FetchResponse&& web_response)
static void onScrapeDone(tr_web::FetchResponse const& web_response)
{
auto const& [status, body, did_connect, did_timeout, vdata] = web_response;
auto* const data = static_cast<struct scrape_data*>(vdata);

View File

@ -1340,7 +1340,7 @@ static char const* torrentRenamePath(
****
***/
static void portTested(tr_web::FetchResponse&& web_response)
static void onPortTested(tr_web::FetchResponse const& web_response)
{
auto const& [status, body, did_connect, did_tmieout, user_data] = web_response;
char result[1024];
@ -1368,7 +1368,7 @@ static char const* portTest(
{
auto const port = tr_sessionGetPeerPort(session);
auto const url = tr_strvJoin("https://portcheck.transmissionbt.com/"sv, std::to_string(port));
session->web->fetch({ url, portTested, idle_data });
session->web->fetch({ url, onPortTested, idle_data });
return nullptr;
}
@ -1376,7 +1376,7 @@ static char const* portTest(
****
***/
static void gotNewBlocklist(tr_web::FetchResponse&& web_response)
static void onBlocklistFetched(tr_web::FetchResponse const& web_response)
{
auto const& [status, body, did_connect, did_timeout, user_data] = web_response;
auto* data = static_cast<struct tr_rpc_idle_data*>(user_data);
@ -1446,7 +1446,7 @@ static char const* blocklistUpdate(
tr_variant* /*args_out*/,
struct tr_rpc_idle_data* idle_data)
{
session->web->fetch({ session->blocklistUrl(), gotNewBlocklist, idle_data });
session->web->fetch({ session->blocklistUrl(), onBlocklistFetched, idle_data });
return nullptr;
}
@ -1501,7 +1501,7 @@ struct add_torrent_idle_data
tr_ctor* ctor;
};
static void gotMetadataFromURL(tr_web::FetchResponse&& web_response)
static void onMetadataFetched(tr_web::FetchResponse const& web_response)
{
auto const& [status, body, did_connect, did_timeout, user_data] = web_response;
auto* data = static_cast<struct add_torrent_idle_data*>(user_data);
@ -1520,7 +1520,7 @@ static void gotMetadataFromURL(tr_web::FetchResponse&& web_response)
else
{
char result[1024];
tr_snprintf(result, sizeof(result), "gotMetadataFromURL: http error %ld: %s", status, tr_webGetResponseStr(status));
tr_snprintf(result, sizeof(result), "onMetadataFetched: http error %ld: %s", status, tr_webGetResponseStr(status));
tr_idle_function_done(data->data, result);
}
@ -1656,7 +1656,7 @@ static char const* torrentAdd(tr_session* session, tr_variant* args_in, tr_varia
d->data = idle_data;
d->ctor = ctor;
auto options = tr_web::FetchOptions{ filename, gotMetadataFromURL, d };
auto options = tr_web::FetchOptions{ filename, onMetadataFetched, d };
options.cookies = cookies;
session->web->fetch(std::move(options));
}

View File

@ -161,7 +161,7 @@ void tr_session::WebController::notifyBandwidthConsumed(int torrent_id, size_t b
}
}
void tr_session::WebController::run(tr_web::FetchDoneFunc func, tr_web::FetchResponse&& response) const
void tr_session::WebController::run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const
{
// marshall the `func` call into the libtransmission thread
@ -170,7 +170,7 @@ void tr_session::WebController::run(tr_web::FetchDoneFunc func, tr_web::FetchRes
auto constexpr callback = [](void* vwrapped)
{
auto* const wrapped = static_cast<wrapper_t*>(vwrapped);
wrapped->first(std::move(wrapped->second));
wrapped->first(wrapped->second);
delete wrapped;
};

View File

@ -351,7 +351,7 @@ public:
[[nodiscard]] unsigned int clamp(int bandwidth_tag, unsigned int byte_count) const override;
void notifyBandwidthConsumed(int torrent_id, size_t byte_count) override;
// runs the tr_web::fetch response callback in the libtransmission thread
void run(tr_web::FetchDoneFunc func, tr_web::FetchResponse&& response) const override;
void run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const override;
private:
tr_session* const session_;

View File

@ -4,8 +4,9 @@
// License text can be found in the licenses/ folder.
#include <algorithm>
#include <memory>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <string_view>
@ -102,7 +103,7 @@ static CURLcode ssl_context_func(CURL* /*curl*/, void* ssl_ctx, void* /*user_dat
class tr_web::Impl
{
public:
Impl(Controller& controller_in)
explicit Impl(Controller& controller_in)
: controller{ controller_in }
{
std::call_once(curl_init_flag, curlInit);
@ -131,7 +132,7 @@ public:
this->user_agent = *ua;
}
curl_thread = std::make_unique<std::thread>(tr_webThreadFunc, this);
curl_thread = std::make_unique<std::thread>(curlThreadFunc, this);
}
~Impl()
@ -157,10 +158,8 @@ public:
return;
}
auto const lock = std::unique_lock(web_tasks_mutex);
auto* const task = new Task{ *this, std::move(options) };
task->next = tasks;
tasks = task;
auto const lock = std::unique_lock(queued_tasks_mutex);
queued_tasks.emplace_back(new Task{ *this, std::move(options) });
}
private:
@ -169,7 +168,7 @@ private:
private:
std::shared_ptr<evbuffer> const privbuf{ evbuffer_new(), evbuffer_free };
std::shared_ptr<CURL> const easy_handle{ curl_easy_init(), curl_easy_cleanup };
tr_web::FetchOptions const options;
tr_web::FetchOptions options;
public:
Task(tr_web::Impl& impl_in, tr_web::FetchOptions&& options_in)
@ -232,12 +231,11 @@ private:
}
response.body.assign(reinterpret_cast<char const*>(evbuffer_pullup(body(), -1)), evbuffer_get_length(body()));
impl.controller.run(options.done_func, std::move(this->response));
impl.controller.run(std::move(options.done_func), std::move(this->response));
}
tr_web::Impl& impl;
tr_web::FetchResponse response;
Task* next = nullptr;
};
static auto constexpr BandwidthPauseMsec = long{ 500 };
@ -251,9 +249,6 @@ private:
std::string curl_ca_bundle;
std::recursive_mutex web_tasks_mutex;
Task* tasks = nullptr;
std::string cookie_file;
std::string user_agent;
@ -321,54 +316,54 @@ private:
TR_ASSERT(std::this_thread::get_id() == impl->curl_thread->get_id());
auto* const e = task->easy();
curl_easy_setopt(e, CURLOPT_SHARE, impl->shared());
curl_easy_setopt(e, CURLOPT_DNS_CACHE_TIMEOUT, DnsCacheTimeoutSecs);
curl_easy_setopt(e, CURLOPT_AUTOREFERER, 1L);
curl_easy_setopt(e, CURLOPT_ENCODING, "");
curl_easy_setopt(e, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(e, CURLOPT_MAXREDIRS, -1L);
curl_easy_setopt(e, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(e, CURLOPT_PRIVATE, task);
(void)curl_easy_setopt(e, CURLOPT_SHARE, impl->shared());
(void)curl_easy_setopt(e, CURLOPT_DNS_CACHE_TIMEOUT, DnsCacheTimeoutSecs);
(void)curl_easy_setopt(e, CURLOPT_AUTOREFERER, 1L);
(void)curl_easy_setopt(e, CURLOPT_ENCODING, "");
(void)curl_easy_setopt(e, CURLOPT_FOLLOWLOCATION, 1L);
(void)curl_easy_setopt(e, CURLOPT_MAXREDIRS, -1L);
(void)curl_easy_setopt(e, CURLOPT_NOSIGNAL, 1L);
(void)curl_easy_setopt(e, CURLOPT_PRIVATE, task);
#ifdef USE_LIBCURL_SOCKOPT
curl_easy_setopt(e, CURLOPT_SOCKOPTFUNCTION, onSocketCreated);
curl_easy_setopt(e, CURLOPT_SOCKOPTDATA, task);
(void)curl_easy_setopt(e, CURLOPT_SOCKOPTFUNCTION, onSocketCreated);
(void)curl_easy_setopt(e, CURLOPT_SOCKOPTDATA, task);
#endif
if (!impl->curl_ssl_verify)
{
curl_easy_setopt(e, CURLOPT_SSL_VERIFYHOST, 0L);
curl_easy_setopt(e, CURLOPT_SSL_VERIFYPEER, 0L);
(void)curl_easy_setopt(e, CURLOPT_SSL_VERIFYHOST, 0L);
(void)curl_easy_setopt(e, CURLOPT_SSL_VERIFYPEER, 0L);
}
else if (!std::empty(impl->curl_ca_bundle))
{
curl_easy_setopt(e, CURLOPT_CAINFO, impl->curl_ca_bundle.c_str());
(void)curl_easy_setopt(e, CURLOPT_CAINFO, impl->curl_ca_bundle.c_str());
}
else
{
curl_easy_setopt(e, CURLOPT_SSL_CTX_FUNCTION, ssl_context_func);
(void)curl_easy_setopt(e, CURLOPT_SSL_CTX_FUNCTION, ssl_context_func);
}
if (!impl->curl_proxy_ssl_verify)
{
curl_easy_setopt(e, CURLOPT_PROXY_SSL_VERIFYHOST, 0L);
curl_easy_setopt(e, CURLOPT_PROXY_SSL_VERIFYPEER, 0L);
(void)curl_easy_setopt(e, CURLOPT_PROXY_SSL_VERIFYHOST, 0L);
(void)curl_easy_setopt(e, CURLOPT_PROXY_SSL_VERIFYPEER, 0L);
}
else if (!std::empty(impl->curl_ca_bundle))
{
curl_easy_setopt(e, CURLOPT_PROXY_CAINFO, impl->curl_ca_bundle.c_str());
(void)curl_easy_setopt(e, CURLOPT_PROXY_CAINFO, impl->curl_ca_bundle.c_str());
}
if (auto const& ua = impl->user_agent; !std::empty(ua))
{
curl_easy_setopt(e, CURLOPT_USERAGENT, ua.c_str());
(void)curl_easy_setopt(e, CURLOPT_USERAGENT, ua.c_str());
}
curl_easy_setopt(e, CURLOPT_TIMEOUT, task->timeoutSecs());
curl_easy_setopt(e, CURLOPT_URL, task->url().c_str());
curl_easy_setopt(e, CURLOPT_VERBOSE, impl->curl_verbose ? 1L : 0L);
curl_easy_setopt(e, CURLOPT_WRITEDATA, task);
curl_easy_setopt(e, CURLOPT_WRITEFUNCTION, onDataReceived);
(void)curl_easy_setopt(e, CURLOPT_TIMEOUT, task->timeoutSecs());
(void)curl_easy_setopt(e, CURLOPT_URL, task->url().c_str());
(void)curl_easy_setopt(e, CURLOPT_VERBOSE, impl->curl_verbose ? 1L : 0L);
(void)curl_easy_setopt(e, CURLOPT_WRITEDATA, task);
(void)curl_easy_setopt(e, CURLOPT_WRITEFUNCTION, onDataReceived);
if (auto const addrstr = impl->controller.publicAddress(); addrstr)
{
@ -388,8 +383,8 @@ private:
if (auto const& range = task->range(); range)
{
/* don't bother asking the server to compress webseed fragments */
curl_easy_setopt(e, CURLOPT_ENCODING, "identity");
curl_easy_setopt(e, CURLOPT_RANGE, range->c_str());
(void)curl_easy_setopt(e, CURLOPT_ENCODING, "identity");
(void)curl_easy_setopt(e, CURLOPT_RANGE, range->c_str());
}
}
@ -420,13 +415,11 @@ private:
}
// the thread started by Impl.curl_thread runs this function
static void tr_webThreadFunc(void* vimpl)
static void curlThreadFunc(Impl* impl)
{
auto* impl = static_cast<tr_web::Impl*>(vimpl);
TR_ASSERT(std::this_thread::get_id() == impl->curl_thread->get_id());
auto const multi = std::shared_ptr<CURLM>(curl_multi_init(), curl_multi_cleanup);
auto running_tasks = int{ 0 };
auto repeats = unsigned{};
for (;;)
{
@ -435,26 +428,21 @@ private:
break;
}
if (impl->run_mode == RunMode::CloseSoon && impl->tasks == nullptr)
if (impl->run_mode == RunMode::CloseSoon && std::empty(impl->queued_tasks) && running_tasks == 0)
{
break;
}
/* add tasks from the queue */
// add queued tasks
{
auto const lock = std::unique_lock(impl->web_tasks_mutex);
while (impl->tasks != nullptr)
auto const lock = std::unique_lock(impl->queued_tasks_mutex);
for (auto* task : impl->queued_tasks)
{
/* pop the task */
auto* const task = impl->tasks;
impl->tasks = task->next;
task->next = nullptr;
dbgmsg("adding task to curl: [%s]", task->url().c_str());
initEasy(impl, task);
curl_multi_add_handle(multi.get(), task->easy());
}
impl->queued_tasks.clear();
}
impl->resumePausedTasks();
@ -479,12 +467,12 @@ private:
repeats = 0;
}
/* call curl_multi_perform() */
auto unused = int{};
curl_multi_perform(multi.get(), &unused);
// nonblocking update of the tasks
curl_multi_perform(multi.get(), &running_tasks);
/* pump completed tasks from the multi */
// process any tasks that just finished
CURLMsg* msg = nullptr;
auto unused = int{};
while ((msg = curl_multi_info_read(multi.get(), &unused)) != nullptr)
{
if (msg->msg == CURLMSG_DONE && msg->easy_handle != nullptr)
@ -508,15 +496,9 @@ private:
}
}
/* Discard any remaining tasks.
* This is rare, but can happen on shutdown with unresponsive trackers. */
while (impl->tasks != nullptr)
{
auto* const task = impl->tasks;
impl->tasks = task->next;
dbgmsg("Discarding task \"%s\"", task->url().c_str());
delete task;
}
// Discard any queued tasks.
// This shouldn't happen, but do it just in case
std::for_each(std::begin(impl->queued_tasks), std::end(impl->queued_tasks), [](auto* task) { delete task; });
impl->is_closed_ = true;
}
@ -524,6 +506,9 @@ private:
private:
std::shared_ptr<CURLSH> const curlsh_{ curl_share_init(), curl_share_cleanup };
std::recursive_mutex queued_tasks_mutex;
std::list<Task*> queued_tasks;
CURLSH* shared()
{
return curlsh_.get();

View File

@ -5,8 +5,9 @@
#pragma once
#include <optional>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
@ -26,14 +27,15 @@ public:
void* user_data;
};
using FetchDoneFunc = void (*)(FetchResponse&& response);
// Callback to invoke when fetch() is done
using FetchDoneFunc = std::function<void(FetchResponse const&)>;
class FetchOptions
{
public:
FetchOptions(std::string_view url_in, FetchDoneFunc done_func_in, void* done_func_user_data_in)
FetchOptions(std::string_view url_in, FetchDoneFunc&& done_func_in, void* done_func_user_data_in)
: url{ url_in }
, done_func{ done_func_in }
, done_func{ std::move(done_func_in) }
, done_func_user_data{ done_func_user_data_in }
{
}
@ -56,8 +58,8 @@ public:
std::optional<int> speed_limit_tag;
// Optionaly set the underlying sockets' send/receive buffers' size.
// Can be useful for scrapes / announces where the payload is known
// to be small.
// Can be used to conserve resources for scrapes and announces, where
// the payload is known to be small.
std::optional<int> sndbuf;
std::optional<int> rcvbuf;
@ -83,16 +85,16 @@ public:
// Will never be true until after closeSoon() is called.
[[nodiscard]] bool isClosed() const;
// closeSoon() *should* be called first, but OK to destroy tr_web before
// isClosed() is true, e.g. there could be a hung fetch task that hasn't
// timmed out yet. Deleting the tr_web object will force-terminate any
// pending tasks.
// If you want to give running tasks a chance to finish, call closeSoon()
// before destroying the tr_web object. Deleting the object will cancel
// all of its tasks.
~tr_web();
/**
* Mediates between tr_web and its clients.
*
* NB: Note that tr_web calls all these methods in the web thread.
* NB: Note that tr_web calls all these methods from its own thread.
* Overridden methods should take care to be threadsafe.
*/
class Controller
{
@ -129,12 +131,14 @@ public:
}
// Invoke the user-provided fetch callback
virtual void run(FetchDoneFunc func, FetchResponse&& response) const
virtual void run(FetchDoneFunc&& func, FetchResponse&& response) const
{
func(std::move(response));
func(response);
}
};
// Note that tr_web does no management of the `controller` reference.
// The caller must ensure `controller` is valid for tr_web's lifespan.
static std::unique_ptr<tr_web> create(Controller& controller);
private:

View File

@ -405,7 +405,7 @@ void on_idle(tr_webseed* w)
}
}
void onWebResponse(tr_web::FetchResponse&& web_response)
void onPartialDataFetched(tr_web::FetchResponse const& web_response)
{
auto const& [status, body, did_connect, did_timeout, vtask] = web_response;
bool const success = status == 206;
@ -508,7 +508,7 @@ void task_request_next_chunk(tr_webseed_task* t)
uint64_t this_pass = std::min(remain, tor->fileSize(file_index) - file_offset);
auto const url = make_url(t->webseed, tor->fileSubpath(file_index));
auto options = tr_web::FetchOptions{ url, onWebResponse, t };
auto options = tr_web::FetchOptions{ url, onPartialDataFetched, t };
options.range = tr_strvJoin(std::to_string(file_offset), "-"sv, std::to_string(file_offset + this_pass - 1));
options.speed_limit_tag = tor->uniqueId;
options.buffer = t->content();