From d2d7987553dd24779de20dccc00b465eb79dccce Mon Sep 17 00:00:00 2001 From: Yat Ho Date: Sun, 24 Dec 2023 22:32:14 +0800 Subject: [PATCH] refactor: allow explicitly queuing functions in session thread (#6406) * feat: allow explicitly queuing functions in session thread * refactor: rename `tr_session_thread`-related functions to snake_case --- libtransmission/rpc-server.cc | 6 ++--- libtransmission/session-thread.cc | 37 +++++++++++++++++------------- libtransmission/session-thread.h | 14 +++++++---- libtransmission/session.cc | 23 ++++++++++--------- libtransmission/session.h | 9 ++++---- libtransmission/torrent.cc | 18 +++++++-------- libtransmission/webseed.cc | 2 +- tests/libtransmission/move-test.cc | 2 +- 8 files changed, 61 insertions(+), 50 deletions(-) diff --git a/libtransmission/rpc-server.cc b/libtransmission/rpc-server.cc index d557816b7..5dbc0b4b3 100644 --- a/libtransmission/rpc-server.cc +++ b/libtransmission/rpc-server.cc @@ -759,7 +759,7 @@ void tr_rpc_server::set_enabled(bool is_enabled) { is_enabled_ = is_enabled; - session->runInSessionThread( + session->run_in_session_thread( [this]() { if (!is_enabled_) @@ -784,7 +784,7 @@ void tr_rpc_server::set_port(tr_port port) noexcept if (is_enabled()) { - session->runInSessionThread(&restart_server, this); + session->run_in_session_thread(&restart_server, this); } } @@ -894,7 +894,7 @@ void tr_rpc_server::load(tr_variant const& src) { auto const rpc_uri = bind_address_->to_string(this->port()) + this->url_; tr_logAddInfo(fmt::format(_("Serving RPC and Web requests on {address}"), fmt::arg("address", rpc_uri))); - session->runInSessionThread(start_server, this); + session->run_in_session_thread(start_server, this); if (this->is_whitelist_enabled()) { diff --git a/libtransmission/session-thread.cc b/libtransmission/session-thread.cc index 92e4596de..674c98548 100644 --- a/libtransmission/session-thread.cc +++ b/libtransmission/session-thread.cc @@ -107,7 +107,7 @@ unsigned long thread_current_id() return hashed; } -void initEvthreadsOnce() +void init_evthreads_once() { evthread_lock_callbacks constexpr LockCbs{ EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock @@ -126,7 +126,7 @@ void initEvthreadsOnce() } // namespace tr_evthread_init_helpers -auto makeEventBase() +auto make_event_base() { tr_session_thread::tr_evthread_init(); @@ -142,7 +142,7 @@ void tr_session_thread::tr_evthread_init() using namespace tr_evthread_init_helpers; static auto evthread_flag = std::once_flag{}; - std::call_once(evthread_flag, initEvthreadsOnce); + std::call_once(evthread_flag, init_evthreads_once); } class tr_session_thread_impl final : public tr_session_thread @@ -152,7 +152,7 @@ public: { auto lock = std::unique_lock(is_looping_mutex_); - thread_ = std::thread(&tr_session_thread_impl::sessionThreadFunc, this, event_base()); + thread_ = std::thread(&tr_session_thread_impl::session_thread_func, this, event_base()); thread_id_ = thread_.get_id(); // wait for the session thread's main loop to start @@ -170,7 +170,7 @@ public: TR_ASSERT(is_looping_); // Stop the first event loop. This is the steady-state loop that runs - // continuously, even when there are no events. See: sessionThreadFunc() + // continuously, even when there are no events. See: session_thread_func() is_shutting_down_ = true; event_base_loopexit(event_base(), nullptr); @@ -193,6 +193,15 @@ public: return thread_id_ == std::this_thread::get_id(); } + void queue(std::function&& func) override + { + work_queue_mutex_.lock(); + work_queue_.emplace_back(std::move(func)); + work_queue_mutex_.unlock(); + + event_active(work_queue_event_.get(), 0, {}); + } + void run(std::function&& func) override { if (am_in_session_thread()) @@ -201,11 +210,7 @@ public: } else { - work_queue_mutex_.lock(); - work_queue_.emplace_back(std::move(func)); - work_queue_mutex_.unlock(); - - event_active(work_queue_event_.get(), 0, {}); + queue(std::move(func)); } } @@ -213,7 +218,7 @@ private: using callback = std::function; using work_queue_t = std::list; - void sessionThreadFunc(struct event_base* evbase) + void session_thread_func(struct event_base* evbase) { #ifndef _WIN32 /* Don't exit when writing on a broken socket */ @@ -247,11 +252,11 @@ private: ToggleLooping({}, {}, this); } - static void onWorkAvailableStatic(evutil_socket_t /*fd*/, short /*flags*/, void* vself) + static void on_work_available_static(evutil_socket_t /*fd*/, short /*flags*/, void* vself) { - static_cast(vself)->onWorkAvailable(); + static_cast(vself)->on_work_available(); } - void onWorkAvailable() + void on_work_available() { TR_ASSERT(am_in_session_thread()); @@ -268,9 +273,9 @@ private: } } - libtransmission::evhelpers::evbase_unique_ptr const evbase_{ makeEventBase() }; + libtransmission::evhelpers::evbase_unique_ptr const evbase_{ make_event_base() }; libtransmission::evhelpers::event_unique_ptr const work_queue_event_{ - event_new(evbase_.get(), -1, 0, onWorkAvailableStatic, this) + event_new(evbase_.get(), -1, 0, on_work_available_static, this) }; work_queue_t work_queue_; diff --git a/libtransmission/session-thread.h b/libtransmission/session-thread.h index f991d78a6..69980e7e9 100644 --- a/libtransmission/session-thread.h +++ b/libtransmission/session-thread.h @@ -28,15 +28,19 @@ public: [[nodiscard]] virtual bool am_in_session_thread() const noexcept = 0; + virtual void queue(std::function&& func) = 0; + virtual void run(std::function&& func) = 0; + template + void queue(Func&& func, Args&&... args) + { + queue(std::function{ std::bind(std::forward(func), std::forward(args)...) }); + } + template void run(Func&& func, Args&&... args) { - run(std::function{ - [func = std::forward(func), args = std::make_tuple(std::forward(args)...)]() - { - std::apply(std::move(func), std::move(args)); - } }); + run(std::function{ std::bind(std::forward(func), std::forward(args)...) }); } }; diff --git a/libtransmission/session.cc b/libtransmission/session.cc index af66ac7af..2d89fc1d4 100644 --- a/libtransmission/session.cc +++ b/libtransmission/session.cc @@ -354,7 +354,7 @@ void tr_session::WebMediator::notifyBandwidthConsumed(int torrent_id, size_t byt void tr_session::WebMediator::run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const { - session_->runInSessionThread(std::move(func), std::move(response)); + session_->run_in_session_thread(std::move(func), std::move(response)); } time_t tr_session::WebMediator::now() const @@ -573,7 +573,7 @@ tr_session* tr_sessionInit(char const* config_dir, bool message_queueing_enabled // run initImpl() in the libtransmission thread auto data = tr_session::init_data{ message_queueing_enabled, config_dir, settings }; auto lock = session->unique_lock(); - session->runInSessionThread([&session, &data]() { session->initImpl(data); }); + session->run_in_session_thread([&session, &data]() { session->initImpl(data); }); data.done_cv.wait(lock); // wait for the session to be ready return session; @@ -876,7 +876,7 @@ void tr_sessionSet(tr_session* session, tr_variant const& settings) // do the work in the session thread auto done_promise = std::promise{}; auto done_future = done_promise.get_future(); - session->runInSessionThread( + session->run_in_session_thread( [&session, &settings, &done_promise]() { session->setSettings(settings, false); @@ -962,7 +962,7 @@ void tr_sessionSetPeerPort(tr_session* session, uint16_t hport) if (auto const port = tr_port::from_host(hport); port != session->localPeerPort()) { - session->runInSessionThread( + session->run_in_session_thread( [session, port]() { auto settings = session->settings_; @@ -1112,7 +1112,7 @@ void tr_session::AltSpeedMediator::is_active_changed(bool is_active, tr_session_ } }; - session_.runInSessionThread(in_session_thread); + session_.run_in_session_thread(in_session_thread); } // --- Session primary speed limits @@ -1412,7 +1412,8 @@ void tr_sessionClose(tr_session* session, size_t timeout_secs) auto closed_promise = std::promise{}; auto closed_future = closed_promise.get_future(); auto const deadline = std::chrono::steady_clock::now() + std::chrono::seconds{ timeout_secs }; - session->runInSessionThread([&closed_promise, deadline, session]() { session->closeImplPart1(&closed_promise, deadline); }); + session->run_in_session_thread([&closed_promise, deadline, session]() + { session->closeImplPart1(&closed_promise, deadline); }); closed_future.wait(); delete session; @@ -1469,7 +1470,7 @@ size_t tr_sessionLoadTorrents(tr_session* session, tr_ctor* ctor) auto loaded_promise = std::promise{}; auto loaded_future = loaded_promise.get_future(); - session->runInSessionThread(session_load_torrents, session, ctor, &loaded_promise); + session->run_in_session_thread(session_load_torrents, session, ctor, &loaded_promise); loaded_future.wait(); auto const n_torrents = loaded_future.get(); @@ -1518,7 +1519,7 @@ void tr_sessionSetDHTEnabled(tr_session* session, bool enabled) if (enabled != session->allowsDHT()) { - session->runInSessionThread( + session->run_in_session_thread( [session, enabled]() { auto settings = session->settings_; @@ -1555,7 +1556,7 @@ void tr_sessionSetUTPEnabled(tr_session* session, bool enabled) return; } - session->runInSessionThread( + session->run_in_session_thread( [session, enabled]() { auto settings = session->settings_; @@ -1570,7 +1571,7 @@ void tr_sessionSetLPDEnabled(tr_session* session, bool enabled) if (enabled != session->allowsLPD()) { - session->runInSessionThread( + session->run_in_session_thread( [session, enabled]() { auto settings = session->settings_; @@ -1655,7 +1656,7 @@ tr_bandwidth& tr_session::getBandwidthGroup(std::string_view name) void tr_sessionSetPortForwardingEnabled(tr_session* session, bool enabled) { - session->runInSessionThread( + session->run_in_session_thread( [session, enabled]() { session->settings_.port_forwarding_enabled = enabled; diff --git a/libtransmission/session.h b/libtransmission/session.h index 15bb4c376..fc9268907 100644 --- a/libtransmission/session.h +++ b/libtransmission/session.h @@ -366,15 +366,16 @@ public: return session_thread_->am_in_session_thread(); } - void runInSessionThread(std::function&& func) + template + void queue_session_thread(Func&& func, Args&&... args) { - session_thread_->run(std::move(func)); + session_thread_->queue(std::forward(func), std::forward(args)...); } template - void runInSessionThread(Func&& func, Args&&... args) + void run_in_session_thread(Func&& func, Args&&... args) { - session_thread_->run(std::forward(func), std::forward(args)...); + session_thread_->run(std::forward(func), std::forward(args)...); } [[nodiscard]] auto* event_base() noexcept diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc index 2f063672b..126c60200 100644 --- a/libtransmission/torrent.cc +++ b/libtransmission/torrent.cc @@ -710,7 +710,7 @@ void tr_torrent::start(bool bypass_queue, std::optional has_any_local_data is_running_ = true; set_dirty(); - session->runInSessionThread([this]() { start_in_session_thread(); }); + session->run_in_session_thread([this]() { start_in_session_thread(); }); } void tr_torrent::start_in_session_thread() @@ -786,7 +786,7 @@ void tr_torrentStop(tr_torrent* tor) tor->start_when_stable_ = false; tor->set_dirty(); - tor->session->runInSessionThread([tor]() { tor->stop_now(); }); + tor->session->run_in_session_thread([tor]() { tor->stop_now(); }); } void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data) @@ -797,7 +797,7 @@ void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func tor->is_deleting_ = true; - tor->session->runInSessionThread(removeTorrentInSessionThread, tor, delete_flag, delete_func, user_data); + tor->session->run_in_session_thread(removeTorrentInSessionThread, tor, delete_flag, delete_func, user_data); } void tr_torrentFreeInSessionThread(tr_torrent* tor) @@ -1175,8 +1175,8 @@ void tr_torrent::set_location(std::string_view location, bool move_from_old_path *setme_state = TR_LOC_MOVING; } - session->runInSessionThread([this, loc = std::string(location), move_from_old_path, setme_state]() - { set_location_in_session_thread(loc, move_from_old_path, setme_state); }); + session->run_in_session_thread([this, loc = std::string(location), move_from_old_path, setme_state]() + { set_location_in_session_thread(loc, move_from_old_path, setme_state); }); } void tr_torrentSetLocation(tr_torrent* tor, char const* location, bool move_from_old_path, int volatile* setme_state) @@ -1263,7 +1263,7 @@ void tr_torrentManualUpdate(tr_torrent* tor) TR_ASSERT(tr_isTorrent(tor)); - tor->session->runInSessionThread(torrentManualUpdateImpl, tor); + tor->session->run_in_session_thread(torrentManualUpdateImpl, tor); } bool tr_torrentCanManualUpdate(tr_torrent const* tor) @@ -1539,7 +1539,7 @@ void tr_torrentStartNow(tr_torrent* tor) void tr_torrentVerify(tr_torrent* tor) { - tor->session->runInSessionThread( + tor->session->run_in_session_thread( [tor]() { TR_ASSERT(tor->session->am_in_session_thread()); @@ -1646,7 +1646,7 @@ void tr_torrent::VerifyMediator::on_verify_done(bool const aborted) if (!aborted && !tor_->is_deleting_) { - tor_->session->runInSessionThread( + tor_->session->run_in_session_thread( [tor = tor_]() { if (tor->is_deleting_) @@ -2493,7 +2493,7 @@ void tr_torrent::rename_path( tr_torrent_rename_done_func callback, void* callback_user_data) { - this->session->runInSessionThread( + this->session->run_in_session_thread( [this, oldpath = std::string(oldpath), newname = std::string(newname), callback, callback_user_data]() { rename_path_in_session_thread(oldpath, newname, callback, callback_user_data); }); } diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index 74dc52ebd..fbb04f17d 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -386,7 +386,7 @@ void useFetchedBlocks(tr_webseed_task* task) auto block_buf = std::make_unique(block_size); evbuffer_remove(task->content(), std::data(*block_buf), std::size(*block_buf)); auto* const data = new write_block_data{ session, tor->id(), task->loc.block, std::move(block_buf), webseed }; - session->runInSessionThread(&write_block_data::write_block_func, data); + session->run_in_session_thread(&write_block_data::write_block_func, data); } task->loc = tor->byte_loc(task->loc.byte + block_size); diff --git a/tests/libtransmission/move-test.cc b/tests/libtransmission/move-test.cc index 896528bf3..a48e382ba 100644 --- a/tests/libtransmission/move-test.cc +++ b/tests/libtransmission/move-test.cc @@ -106,7 +106,7 @@ TEST_P(IncompleteDirTest, incompleteDir) std::fill_n(std::data(*data.buf), tr_block_info::BlockSize, '\0'); data.block = block_index; data.done = false; - session_->runInSessionThread(test_incomplete_dir_threadfunc, &data); + session_->run_in_session_thread(test_incomplete_dir_threadfunc, &data); auto const test = [&data]() {