From 831eb8d40f9210bf03f2643fa2a6147ac400ad7a Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Fri, 4 Nov 2022 16:20:27 -0500 Subject: [PATCH] refactor: make `tr_session_thread` a unique_ptr owned by tr_session (#4069) --- Transmission.xcodeproj/project.pbxproj | 16 +- libtransmission/CMakeLists.txt | 4 +- libtransmission/peer-io.cc | 5 - libtransmission/peer-mgr.cc | 10 +- libtransmission/rpc-server.cc | 8 +- libtransmission/session-thread.cc | 296 +++++++++++++++++++++++++ libtransmission/session-thread.h | 42 ++++ libtransmission/session.cc | 179 +++++---------- libtransmission/session.h | 64 ++++-- libtransmission/timer-ev.cc | 44 ++-- libtransmission/torrent.cc | 56 ++--- libtransmission/torrent.h | 4 +- libtransmission/trevent.cc | 275 ----------------------- libtransmission/trevent.h | 39 ---- libtransmission/web.cc | 6 +- libtransmission/web.h | 2 +- libtransmission/webseed.cc | 3 +- tests/libtransmission/move-test.cc | 2 +- tests/libtransmission/test-fixtures.h | 3 +- 19 files changed, 504 insertions(+), 554 deletions(-) create mode 100644 libtransmission/session-thread.cc create mode 100644 libtransmission/session-thread.h delete mode 100644 libtransmission/trevent.cc delete mode 100644 libtransmission/trevent.h diff --git a/Transmission.xcodeproj/project.pbxproj b/Transmission.xcodeproj/project.pbxproj index e98bb0a71..c6bee06e5 100644 --- a/Transmission.xcodeproj/project.pbxproj +++ b/Transmission.xcodeproj/project.pbxproj @@ -153,8 +153,8 @@ A242AD9315F05D23002B3A6C /* Localizable.strings in Resources */ = {isa = PBXBuildFile; fileRef = A242AD9115F05D23002B3A6C /* Localizable.strings */; }; A2451E6916ACE4EB00586E0E /* FileRenameSheetController.mm in Sources */ = {isa = PBXBuildFile; fileRef = A2451E6716ACE4EB00586E0E /* FileRenameSheetController.mm */; }; A2451E6A16ACE4EB00586E0E /* FileRenameSheetController.xib in Resources */ = {isa = PBXBuildFile; fileRef = A2451E6816ACE4EB00586E0E /* FileRenameSheetController.xib */; }; - A24621410C769D0900088E81 /* trevent.h in Headers */ = {isa = PBXBuildFile; fileRef = A24621350C769CF400088E81 /* trevent.h */; }; - A24621420C769D0900088E81 /* trevent.cc in Sources */ = {isa = PBXBuildFile; fileRef = A24621360C769CF400088E81 /* trevent.cc */; }; + A24621410C769D0900088E81 /* session-thread.h in Headers */ = {isa = PBXBuildFile; fileRef = A24621350C769CF400088E81 /* session-thread.h */; }; + A24621420C769D0900088E81 /* session-thread.cc in Sources */ = {isa = PBXBuildFile; fileRef = A24621360C769CF400088E81 /* session-thread.cc */; }; A247A443114C701800547DFC /* InfoViewController.h in Headers */ = {isa = PBXBuildFile; fileRef = A247A442114C701800547DFC /* InfoViewController.h */; }; A24F19080A3A790800C9C145 /* Sparkle.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = A24F19070A3A790800C9C145 /* Sparkle.framework */; settings = {ATTRIBUTES = (Weak, ); }; }; A24F19210A3A796800C9C145 /* Sparkle.framework in Copy Files */ = {isa = PBXBuildFile; fileRef = A24F19070A3A790800C9C145 /* Sparkle.framework */; settings = {ATTRIBUTES = (CodeSignOnCopy, ); }; }; @@ -874,8 +874,8 @@ A2451E6616ACE4EB00586E0E /* FileRenameSheetController.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = FileRenameSheetController.h; sourceTree = ""; }; A2451E6716ACE4EB00586E0E /* FileRenameSheetController.mm */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.objcpp; path = FileRenameSheetController.mm; sourceTree = ""; }; A2451E6816ACE4EB00586E0E /* FileRenameSheetController.xib */ = {isa = PBXFileReference; lastKnownFileType = file.xib; path = FileRenameSheetController.xib; sourceTree = ""; }; - A24621350C769CF400088E81 /* trevent.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = trevent.h; sourceTree = ""; }; - A24621360C769CF400088E81 /* trevent.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = trevent.cc; sourceTree = ""; }; + A24621350C769CF400088E81 /* session-thread.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = session-thread.h; sourceTree = ""; }; + A24621360C769CF400088E81 /* session-thread.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = session-thread.cc; sourceTree = ""; }; A247A442114C701800547DFC /* InfoViewController.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = InfoViewController.h; sourceTree = ""; }; A24F19070A3A790800C9C145 /* Sparkle.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = Sparkle.framework; sourceTree = ""; }; A25485390EB66CBB004539DA /* codelength.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = codelength.h; sourceTree = ""; }; @@ -1730,8 +1730,8 @@ A2679292130E00A000CB7464 /* tr-utp.cc */, A2679293130E00A000CB7464 /* tr-utp.h */, BEFC1DF50C07861A00B0BB3C /* transmission.h */, - A24621360C769CF400088E81 /* trevent.cc */, - A24621350C769CF400088E81 /* trevent.h */, + A24621360C769CF400088E81 /* session-thread.cc */, + A24621350C769CF400088E81 /* session-thread.h */, BEFC1DF40C07861A00B0BB3C /* port-forwarding-upnp.cc */, BEFC1DF30C07861A00B0BB3C /* port-forwarding-upnp.h */, BEFC1DF20C07861A00B0BB3C /* utils.cc */, @@ -2176,7 +2176,7 @@ BEFC1E550C07861A00B0BB3C /* completion.h in Headers */, BEFC1E570C07861A00B0BB3C /* clients.h in Headers */, A2BE9C530C1E4AF7002D16E6 /* makemeta.h in Headers */, - A24621410C769D0900088E81 /* trevent.h in Headers */, + A24621410C769D0900088E81 /* session-thread.h in Headers */, 4D36BA700CA2F00800A63CA5 /* peer-mse.h in Headers */, C10C644E1D9AF328003C1B4C /* session-id.h in Headers */, 4D36BA730CA2F00800A63CA5 /* handshake.h in Headers */, @@ -2904,7 +2904,7 @@ BEFC1E560C07861A00B0BB3C /* completion.cc in Sources */, BEFC1E580C07861A00B0BB3C /* clients.cc in Sources */, A2BE9C520C1E4AF5002D16E6 /* makemeta.cc in Sources */, - A24621420C769D0900088E81 /* trevent.cc in Sources */, + A24621420C769D0900088E81 /* session-thread.cc in Sources */, C11DEA161FCD31C0009E22B9 /* subprocess-posix.cc in Sources */, 4D36BA6F0CA2F00800A63CA5 /* peer-mse.cc in Sources */, 4D36BA720CA2F00800A63CA5 /* handshake.cc in Sources */, diff --git a/libtransmission/CMakeLists.txt b/libtransmission/CMakeLists.txt index 315de36b7..8c6980418 100644 --- a/libtransmission/CMakeLists.txt +++ b/libtransmission/CMakeLists.txt @@ -53,6 +53,7 @@ set(PROJECT_FILES session-id.cc session-alt-speeds.cc session-settings.cc + session-thread.cc session.cc stats.cc subprocess-posix.cc @@ -71,7 +72,6 @@ set(PROJECT_FILES tr-lpd.cc tr-udp.cc tr-utp.cc - trevent.cc utils.cc variant-benc.cc variant-json.cc @@ -197,6 +197,7 @@ set(${PROJECT_NAME}_PRIVATE_HEADERS resume.h rpc-server.h session-alt-speeds.h + session-thread.h session.h stats.h subprocess.h @@ -208,7 +209,6 @@ set(${PROJECT_NAME}_PRIVATE_HEADERS tr-dht.h tr-lpd.h tr-utp.h - trevent.h variant-common.h verify.h version.h diff --git a/libtransmission/peer-io.cc b/libtransmission/peer-io.cc index bfd68c06d..5ff87ac5a 100644 --- a/libtransmission/peer-io.cc +++ b/libtransmission/peer-io.cc @@ -480,7 +480,6 @@ std::shared_ptr tr_peerIo::create( struct tr_peer_socket const socket) { TR_ASSERT(session != nullptr); - TR_ASSERT(session->events != nullptr); auto lock = session->unique_lock(); TR_ASSERT(isSupportedSocket(socket)); @@ -596,7 +595,6 @@ std::shared_ptr tr_peerIo::newOutgoing( static void event_enable(tr_peerIo* io, short event) { TR_ASSERT(io->session != nullptr); - TR_ASSERT(io->session->events != nullptr); bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP; @@ -633,8 +631,6 @@ static void event_enable(tr_peerIo* io, short event) static void event_disable(tr_peerIo* io, short event) { - TR_ASSERT(io->session->events != nullptr); - bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP; if (need_events) @@ -730,7 +726,6 @@ static void io_close_socket(tr_peerIo* io) tr_peerIo::~tr_peerIo() { auto const lock = session->unique_lock(); - TR_ASSERT(session->events != nullptr); clearCallbacks(); tr_logAddTraceIo(this, "in tr_peerIo destructor"); diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index e6d172f32..80ac4f252 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -565,11 +565,11 @@ struct tr_peerMgr explicit tr_peerMgr(tr_session* session_in) : session{ session_in } , bandwidth_timer_{ session->timerMaker().create([this]() { bandwidthPulse(); }) } - , rechoke_timer_{ session->timerMaker().create([this]() { rechokePulseMarshall(); }) } + , rechoke_timer_{ session->timerMaker().create([this]() { rechokePulse(); }) } , refill_upkeep_timer_{ session->timerMaker().create([this]() { refillUpkeep(); }) } { bandwidth_timer_->startRepeating(BandwidthPeriod); - rechoke_timer_->startSingleShot(RechokePeriod); + rechoke_timer_->startRepeating(RechokePeriod); refill_upkeep_timer_->startRepeating(RefillUpkeepPeriod); } @@ -604,12 +604,6 @@ struct tr_peerMgr Handshakes incoming_handshakes; private: - void rechokePulseMarshall() - { - rechokePulse(); - rechoke_timer_->startSingleShot(RechokePeriod); - } - std::unique_ptr const bandwidth_timer_; std::unique_ptr const rechoke_timer_; std::unique_ptr const refill_upkeep_timer_; diff --git a/libtransmission/rpc-server.cc b/libtransmission/rpc-server.cc index 58820d109..1b8cbed36 100644 --- a/libtransmission/rpc-server.cc +++ b/libtransmission/rpc-server.cc @@ -45,7 +45,6 @@ #include "timer.h" #include "tr-assert.h" #include "tr-strbuf.h" -#include "trevent.h" #include "utils.h" #include "variant.h" #include "web-utils.h" @@ -755,8 +754,7 @@ void tr_rpc_server::setEnabled(bool is_enabled) { is_enabled_ = is_enabled; - tr_runInEventThread( - this->session, + session->runInSessionThread( [this]() { if (!is_enabled_) @@ -790,7 +788,7 @@ void tr_rpc_server::setPort(tr_port port) noexcept if (isEnabled()) { - tr_runInEventThread(session, restartServer, this); + session->runInSessionThread(&restartServer, this); } } @@ -931,7 +929,7 @@ void tr_rpc_server::load(tr_variant* src) { auto const rpc_uri = tr_rpc_address_with_port(this) + this->url_; tr_logAddInfo(fmt::format(_("Serving RPC and Web requests on {address}"), fmt::arg("address", rpc_uri))); - tr_runInEventThread(session, startServer, this); + session->runInSessionThread(startServer, this); if (this->isWhitelistEnabled()) { diff --git a/libtransmission/session-thread.cc b/libtransmission/session-thread.cc new file mode 100644 index 000000000..cb72edf0d --- /dev/null +++ b/libtransmission/session-thread.cc @@ -0,0 +1,296 @@ +// This file Copyright © 2007-2022 Mnemosyne LLC. +// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), +// or any future license endorsed by Mnemosyne LLC. +// License text can be found in the licenses/ folder. + +#include +#include +#include +#include +#include +#include +#include +#include // for std::move(), std::swap() + +#include + +#ifdef _WIN32 +#include +#endif + +#include +#include + +#include "transmission.h" + +#include "log.h" +#include "session-thread.h" +#include "tr-assert.h" +#include "utils.h" // for tr_net_init() + +using namespace std::literals; + +/// + +namespace +{ +namespace tr_evthread_init_helpers +{ +void* lock_alloc(unsigned /*locktype*/) +{ + return new std::recursive_mutex{}; +} + +void lock_free(void* vlock, unsigned /*locktype*/) +{ + delete static_cast(vlock); +} + +int lock_lock(unsigned mode, void* vlock) +{ + auto* lock = static_cast(vlock); + if ((mode & EVTHREAD_TRY) != 0U) + { + auto const success = lock->try_lock(); + return success ? 0 : -1; + } + lock->lock(); + return 0; +} + +int lock_unlock(unsigned /*mode*/, void* vlock) +{ + static_cast(vlock)->unlock(); + return 0; +} + +void* cond_alloc(unsigned /*condflags*/) +{ + return new std::condition_variable_any(); +} + +void cond_free(void* vcond) +{ + delete static_cast(vcond); +} + +int cond_signal(void* vcond, int broadcast) +{ + auto* cond = static_cast(vcond); + if (broadcast != 0) + { + cond->notify_all(); + } + else + { + cond->notify_one(); + } + return 0; +} + +int cond_wait(void* vcond, void* vlock, struct timeval const* tv) +{ + auto* cond = static_cast(vcond); + auto* lock = static_cast(vlock); + if (tv == nullptr) + { + cond->wait(*lock); + return 0; + } + + auto const duration = std::chrono::seconds(tv->tv_sec) + std::chrono::microseconds(tv->tv_usec); + auto const success = cond->wait_for(*lock, duration); + return success == std::cv_status::timeout ? 1 : 0; +} + +unsigned long thread_current_id() +{ + return std::hash()(std::this_thread::get_id()); +} + +auto evthread_flag = std::once_flag{}; + +void initEvthreadsOnce() +{ + tr_net_init(); + + evthread_lock_callbacks constexpr LockCbs{ + EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock + }; + evthread_set_lock_callbacks(&LockCbs); + + evthread_condition_callbacks constexpr CondCbs{ EVTHREAD_CONDITION_API_VERSION, + cond_alloc, + cond_free, + cond_signal, + cond_wait }; + evthread_set_condition_callbacks(&CondCbs); + + evthread_set_id_callback(thread_current_id); +} + +} // namespace tr_evthread_init_helpers + +auto makeEventBase() +{ + tr_session_thread::tr_evthread_init(); + + return std::unique_ptr{ event_base_new(), event_base_free }; +} + +} // namespace + +/// + +void tr_session_thread::tr_evthread_init() +{ + using namespace tr_evthread_init_helpers; + std::call_once(evthread_flag, initEvthreadsOnce); +} + +class tr_session_thread_impl final : public tr_session_thread +{ +public: + explicit tr_session_thread_impl() + : evbase_{ makeEventBase() } + , work_queue_event_{ event_new(evbase_.get(), -1, 0, onWorkAvailableStatic, this), event_free } + { + auto lock = std::unique_lock(is_looping_mutex_); + + thread_ = std::thread(&tr_session_thread_impl::sessionThreadFunc, this, eventBase()); + thread_id_ = thread_.get_id(); + + // wait for the session thread's main loop to start + is_looping_cv_.wait(lock, [this]() { return is_looping_; }); + } + + tr_session_thread_impl(tr_session_thread_impl&&) = delete; + tr_session_thread_impl(tr_session_thread_impl const&) = delete; + tr_session_thread_impl& operator=(tr_session_thread_impl&&) = delete; + tr_session_thread_impl& operator=(tr_session_thread_impl const&) = delete; + + ~tr_session_thread_impl() override + { + TR_ASSERT(!amInSessionThread()); + TR_ASSERT(is_looping_); + + // Stop the first event loop. This is the steady-state loop that runs + // continuously, even when there are no events. See: sessionThreadFunc() + is_shutting_down_ = true; + event_base_loopexit(eventBase(), nullptr); + + // Wait on the second event loop. This is the shutdown loop that exits + // as soon as there are no events. This step is to give pending tasks + // a chance to finish. + auto lock = std::unique_lock(is_looping_mutex_); + is_looping_cv_.wait_for(lock, Deadline, [this]() { return !is_looping_; }); + event_base_loopexit(eventBase(), nullptr); + thread_.join(); + } + + [[nodiscard]] struct event_base* eventBase() noexcept override + { + return evbase_.get(); + } + + [[nodiscard]] bool amInSessionThread() const noexcept override + { + return thread_id_ == std::this_thread::get_id(); + } + + void run(std::function&& func) override + { + if (amInSessionThread()) + { + func(); + } + else + { + work_queue_mutex_.lock(); + work_queue_.emplace_back(std::move(func)); + work_queue_mutex_.unlock(); + + event_active(work_queue_event_.get(), 0, {}); + } + } + +private: + using callback = std::function; + using work_queue_t = std::list; + + void sessionThreadFunc(struct event_base* evbase) + { +#ifndef _WIN32 + /* Don't exit when writing on a broken socket */ + (void)signal(SIGPIPE, SIG_IGN); +#endif + tr_evthread_init(); + + constexpr auto ToggleLooping = [](evutil_socket_t, short, void* vself) + { + auto* const self = static_cast(vself); + self->is_looping_mutex_.lock(); + self->is_looping_ = !self->is_looping_; + self->is_looping_mutex_.unlock(); + + self->is_looping_cv_.notify_one(); + }; + + event_base_once(evbase, -1, EV_TIMEOUT, ToggleLooping, this, nullptr); + + // Start the first event loop. This is the steady-state loop that runs + // continuously until `this` is destroyed. See: ~tr_session_thread_impl() + TR_ASSERT(!is_shutting_down_); + event_base_loop(evbase, EVLOOP_NO_EXIT_ON_EMPTY); + + // Start the second event loop. This is the shutdown loop that exits as + // soon as there are no events. It's used to give any remaining events + // a chance to finish up before we exit. + TR_ASSERT(is_shutting_down_); + event_base_loop(evbase, 0); + + ToggleLooping({}, {}, this); + } + + static void onWorkAvailableStatic(evutil_socket_t /*fd*/, short /*flags*/, void* vself) + { + static_cast(vself)->onWorkAvailable(); + } + void onWorkAvailable() + { + TR_ASSERT(amInSessionThread()); + + // steal the work queue + auto work_queue_lock = std::unique_lock(work_queue_mutex_); + auto work_queue = work_queue_t{}; + std::swap(work_queue, work_queue_); + work_queue_lock.unlock(); + + // process the work queue + for (auto const& func : work_queue) + { + func(); + } + } + + std::unique_ptr const evbase_; + std::unique_ptr const work_queue_event_; + + work_queue_t work_queue_; + std::mutex work_queue_mutex_; + + std::thread thread_; + std::thread::id thread_id_; + + std::mutex is_looping_mutex_; + std::condition_variable is_looping_cv_; + bool is_looping_ = false; + + bool is_shutting_down_ = false; + static constexpr std::chrono::seconds Deadline = 5s; +}; + +std::unique_ptr tr_session_thread::create() +{ + return std::make_unique(); +} diff --git a/libtransmission/session-thread.h b/libtransmission/session-thread.h new file mode 100644 index 000000000..2eecafd09 --- /dev/null +++ b/libtransmission/session-thread.h @@ -0,0 +1,42 @@ +// This file Copyright © 2007-2022 Mnemosyne LLC. +// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), +// or any future license endorsed by Mnemosyne LLC. +// License text can be found in the licenses/ folder. + +#pragma once + +#ifndef __TRANSMISSION__ +#error only libtransmission should #include this header. +#endif + +#include +#include +#include +#include + +struct event_base; + +class tr_session_thread +{ +public: + static void tr_evthread_init(); + + static std::unique_ptr create(); + virtual ~tr_session_thread() = default; + + [[nodiscard]] virtual struct event_base* eventBase() noexcept = 0; + + [[nodiscard]] virtual bool amInSessionThread() const noexcept = 0; + + virtual void run(std::function&& func) = 0; + + template + void run(Func&& func, Args&&... args) + { + run(std::function{ + [func = std::forward(func), args = std::make_tuple(std::forward(args)...)]() + { + std::apply(std::move(func), std::move(args)); + } }); + } +}; diff --git a/libtransmission/session.cc b/libtransmission/session.cc index 1ebf89af3..f26b6852b 100644 --- a/libtransmission/session.cc +++ b/libtransmission/session.cc @@ -55,7 +55,6 @@ #include "tr-lpd.h" #include "tr-strbuf.h" #include "tr-utp.h" -#include "trevent.h" #include "utils.h" #include "variant.h" #include "verify.h" @@ -227,7 +226,7 @@ void tr_session::WebMediator::notifyBandwidthConsumed(int torrent_id, size_t byt void tr_session::WebMediator::run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const { - tr_runInEventThread(session_, std::move(func), std::move(response)); + session_->runInSessionThread(std::move(func), std::move(response)); } void tr_sessionFetch(tr_session* session, tr_web::FetchOptions&& options) @@ -449,27 +448,15 @@ tr_session* tr_sessionInit(char const* config_dir, bool message_queueing_enabled tr_logSetLevel(static_cast(val)); } - // start the libtransmission thread - tr_net_init(); // must go before tr_eventInit - tr_eventInit(session); - TR_ASSERT(session->events != nullptr); - auto data = tr_session::init_data{}; data.config_dir = config_dir; data.message_queuing_enabled = message_queueing_enabled; data.client_settings = client_settings; - // run it in the libtransmission thread - if (tr_amInEventThread(session)) - { - session->initImpl(data); - } - else - { - auto lock = session->unique_lock(); - tr_runInEventThread(session, [&session, &data]() { session->initImpl(data); }); - data.done_cv.wait(lock); // wait for the session to be ready - } + // run initImpl() in the libtransmission thread + auto lock = session->unique_lock(); + session->runInSessionThread([&session, &data]() { session->initImpl(data); }); + data.done_cv.wait(lock); // wait for the session to be ready return session; } @@ -515,7 +502,7 @@ void tr_session::onNowTimer() void tr_session::initImpl(init_data& data) { auto lock = unique_lock(); - TR_ASSERT(tr_amInEventThread(this)); + TR_ASSERT(amInSessionThread()); auto* const client_settings = data.client_settings; TR_ASSERT(tr_variantIsDict(client_settings)); @@ -560,7 +547,7 @@ static void updateBandwidth(tr_session* session, tr_direction dir); void tr_session::setSettings(tr_variant* settings_dict, bool force) { - TR_ASSERT(tr_amInEventThread(this)); + TR_ASSERT(amInSessionThread()); auto* const settings = settings_dict; TR_ASSERT(tr_variantIsDict(settings)); @@ -655,7 +642,7 @@ void tr_sessionSet(tr_session* session, tr_variant* settings) { // run it in the libtransmission thread - if (tr_amInEventThread(session)) + if (session->amInSessionThread()) { session->setSettings(settings, false); } @@ -664,8 +651,7 @@ void tr_sessionSet(tr_session* session, tr_variant* settings) auto lock = session->unique_lock(); auto done_cv = std::condition_variable_any{}; - tr_runInEventThread( - session, + session->runInSessionThread( [&session, &settings, &done_cv]() { session->setSettings(settings, false); @@ -784,7 +770,7 @@ void tr_session::setPeerPort(tr_port port_in) } }; - tr_runInEventThread(this, in_session_thread, port_in); + runInSessionThread(in_session_thread, port_in); } void tr_sessionSetPeerPort(tr_session* session, uint16_t hport) @@ -947,7 +933,7 @@ void tr_session::AltSpeedMediator::isActiveChanged(bool is_active, tr_session_al } }; - tr_runInEventThread(&session_, in_session_thread); + session_.runInSessionThread(in_session_thread); } /*** @@ -1178,27 +1164,26 @@ double tr_sessionGetRawSpeed_KBps(tr_session const* session, tr_direction dir) return tr_toSpeedKBps(tr_sessionGetRawSpeed_Bps(session, dir)); } -void tr_session::closeImplStart() +void tr_session::closeImplPart1() { is_closing_ = true; + // close the low-hanging fruit that can be closed immediately w/o consequences + verifier_.reset(); + save_timer_.reset(); + now_timer_.reset(); + rpc_server_.reset(); lpd_.reset(); + port_forwarding_.reset(); + closePeerPort(); + // tell other items to start shutting down udp_core_->startShutdown(); announcer_udp_->startShutdown(); - save_timer_.reset(); - now_timer_.reset(); - - verifier_.reset(); - port_forwarding_.reset(); - - closePeerPort(); - this->rpc_server_.reset(); - - /* Close the torrents. Get the most active ones first so that - * if we can't get them all closed in a reasonable amount of time, - * at least we get the most important ones first. */ + // Close the torrents in order of most active to least active + // so that the most important announce=stopped events are + // fired out first... auto torrents = getAllTorrents(); std::sort( std::begin(torrents), @@ -1209,129 +1194,87 @@ void tr_session::closeImplStart() auto const b_cur = b->downloadedCur + b->uploadedCur; return a_cur > b_cur; // larger xfers go first }); - for (auto* tor : torrents) { - tr_torrentFree(tor); + tr_torrentFreeInSessionThread(tor); } - torrents.clear(); - - /* Close the announcer *after* closing the torrents - so that all the &event=stopped messages will be - queued to be sent by tr_announcerClose() */ + // ...and now that all the torrents have been closed, any + // remaining `event=stopped` announce messages are queued in + // the announcer. The announcer's destructor sends all those + // out via `web_`... tr_announcerClose(this); - - /* and this goes *after* announcer close so that - it won't be idle until the announce events are sent... */ - this->web_->closeSoon(); - + // ...and now that those are queued, tell web_ that we're + // shutting down soon. This leaves the `event=stopped` messages + // in the queue but refuses to take any _new_ tasks + this->web_->startShutdown(); this->cache.reset(); - /* saveTimer is not used at this point, reusing for UDP shutdown wait */ + // recycle the now-unused save_timer_ here to wait for UDP shutdown TR_ASSERT(!save_timer_); - save_timer_ = timerMaker().create([this]() { closeImplWaitForIdleUdp(); }); - save_timer_->start(1ms); + save_timer_ = timerMaker().create([this]() { closeImplPart2(); }); + save_timer_->startRepeating(50ms); } -void tr_session::closeImplWaitForIdleUdp() +void tr_session::closeImplPart2() { - /* gotta keep udp running long enough to send out all - the &event=stopped UDP tracker messages */ + // try to keep the UDP announcer alive long enough to send out + // all the &event=stopped tracker announces if (announcer_udp_ && !announcer_udp_->isIdle()) { announcer_udp_->upkeep(); - save_timer_->start(100ms); return; } - closeImplFinish(); -} - -void tr_session::closeImplFinish() -{ save_timer_.reset(); - /* we had to wait until UDP trackers were closed before closing these: */ this->announcer_udp_.reset(); this->udp_core_.reset(); stats().saveIfDirty(); peer_mgr_.reset(); tr_utpClose(this); - blocklists_.clear(); openFiles().closeAll(); is_closed_ = true; } -static bool deadlineReached(time_t const deadline) -{ - return time(nullptr) >= deadline; -} - -static auto constexpr ShutdownMaxSeconds = time_t{ 20 }; - void tr_sessionClose(tr_session* session) { TR_ASSERT(session != nullptr); - time_t const deadline = time(nullptr) + ShutdownMaxSeconds; + static auto constexpr DeadlineSecs = 10s; + auto const deadline = std::chrono::steady_clock::now() + DeadlineSecs; + auto const deadline_reached = [deadline]() + { + return std::chrono::steady_clock::now() >= deadline; + }; tr_logAddInfo(fmt::format(_("Transmission version {version} shutting down"), fmt::arg("version", LONG_VERSION_STRING))); - tr_logAddDebug(fmt::format("now is {}, deadline is {}", time(nullptr), deadline)); /* close the session */ - tr_runInEventThread(session, [session]() { session->closeImplStart(); }); + session->runInSessionThread([session]() { session->closeImplPart1(); }); - while (!session->isClosed() && !deadlineReached(deadline)) + while (!session->isClosed() && !deadline_reached()) { tr_logAddTrace("waiting for the libtransmission thread to finish"); tr_wait_msec(10); } - /* "port_forwarding" and "tracker" have live sockets, - * so we need to keep the transmission thread alive - * for a bit while they tell the router & tracker - * that we're closing now */ - while ( - (session->port_forwarding_ || !session->web_->isClosed() || session->announcer != nullptr || session->announcer_udp_) && - !deadlineReached(deadline)) + // There's usually a bit of housekeeping to do during shutdown, + // e.g. sending out `event=stopped` announcements to trackers, + // so wait a bit for the session thread to close. + while (!deadline_reached() && (!session->web_->isClosed() || session->announcer != nullptr || session->announcer_udp_)) { tr_logAddTrace(fmt::format( - "waiting on port unmap ({}) or announcer ({})... now {} deadline {}", + "waiting on port unmap ({}) or announcer ({})... now {}", fmt::ptr(session->port_forwarding_.get()), fmt::ptr(session->announcer), - time(nullptr), - deadline)); + time(nullptr))); tr_wait_msec(50); } session->web_.reset(); - /* close the libtransmission thread */ - tr_eventClose(session); - - while (session->events != nullptr) - { - static bool forced = false; - tr_logAddTrace( - fmt::format("waiting for libtransmission thread to finish... now {} deadline {}", time(nullptr), deadline)); - tr_wait_msec(10); - - if (deadlineReached(deadline) && !forced) - { - tr_logAddTrace("calling event_loopbreak()"); - forced = true; - event_base_loopbreak(session->eventBase()); - } - - if (deadlineReached(deadline + 3)) - { - tr_logAddTrace("deadline+3 reached... calling break..."); - break; - } - } - delete session; } @@ -1395,7 +1338,7 @@ size_t tr_sessionLoadTorrents(tr_session* session, tr_ctor* ctor) data.session = session; data.ctor = ctor; data.done = false; - tr_runInEventThread(session, sessionLoadTorrents, &data); + session->runInSessionThread(sessionLoadTorrents, &data); while (!data.done) { tr_wait_msec(100); @@ -1451,8 +1394,7 @@ void tr_sessionSetDHTEnabled(tr_session* session, bool enabled) return; } - tr_runInEventThread( - session, + session->runInSessionThread( [session, enabled]() { session->udp_core_.reset(); @@ -1502,8 +1444,7 @@ void tr_sessionSetLPDEnabled(tr_session* session, bool enabled) return; } - tr_runInEventThread( - session, + session->runInSessionThread( [session, enabled]() { session->lpd_.reset(); @@ -1598,7 +1539,7 @@ tr_bandwidth& tr_session::getBandwidthGroup(std::string_view name) void tr_sessionSetPortForwardingEnabled(tr_session* session, bool enabled) { - tr_runInEventThread(session, [session, enabled]() { session->port_forwarding_->setEnabled(enabled); }); + session->runInSessionThread([session, enabled]() { session->port_forwarding_->setEnabled(enabled); }); } bool tr_sessionIsPortForwardingEnabled(tr_session const* session) @@ -2220,12 +2161,6 @@ auto makeBlocklistDir(std::string_view config_dir) return dir; } -auto makeEventBase() -{ - tr_evthread_init(); - return std::unique_ptr{ event_base_new(), event_base_free }; -} - } // namespace tr_session::tr_session(std::string_view config_dir, tr_variant* settings_dict) @@ -2233,7 +2168,7 @@ tr_session::tr_session(std::string_view config_dir, tr_variant* settings_dict) , resume_dir_{ makeResumeDir(config_dir) } , torrent_dir_{ makeTorrentDir(config_dir) } , blocklist_dir_{ makeBlocklistDir(config_dir) } - , event_base_{ makeEventBase() } + , session_thread_{ tr_session_thread::create() } , timer_maker_{ std::make_unique(eventBase()) } , settings_{ settings_dict } , session_id_{ tr_time } diff --git a/libtransmission/session.h b/libtransmission/session.h index cf9012856..55987ba36 100644 --- a/libtransmission/session.h +++ b/libtransmission/session.h @@ -38,6 +38,7 @@ #include "session-alt-speeds.h" #include "session-id.h" #include "session-settings.h" +#include "session-thread.h" #include "stats.h" #include "torrents.h" #include "tr-lpd.h" @@ -51,6 +52,7 @@ struct event_base; class tr_lpd; class tr_port_forwarding; class tr_rpc_server; +class tr_session_thread; class tr_web; struct struct_utp_context; struct tr_announcer; @@ -272,16 +274,32 @@ public: return session_id_.sv(); } - [[nodiscard]] event_base* eventBase() noexcept - { - return event_base_.get(); - } - [[nodiscard]] libtransmission::TimerMaker& timerMaker() noexcept { return *timer_maker_; } + [[nodiscard]] auto amInSessionThread() noexcept + { + return session_thread_->amInSessionThread(); + } + + void runInSessionThread(std::function&& func) + { + session_thread_->run(std::move(func)); + } + + template + void runInSessionThread(Func&& func, Args&&... args) + { + session_thread_->run(std::move(func), std::move(args)...); + } + + [[nodiscard]] auto eventBase() noexcept + { + return session_thread_->eventBase(); + } + [[nodiscard]] constexpr auto& torrents() { return torrents_; @@ -880,9 +898,8 @@ private: void initImpl(init_data&); void setSettings(tr_variant* settings_dict, bool force); - void closeImplStart(); - void closeImplWaitForIdleUdp(); - void closeImplFinish(); + void closeImplPart1(); + void closeImplPart2(); void onNowTimer(); @@ -976,9 +993,9 @@ private: std::string const torrent_dir_; std::string const blocklist_dir_; - std::unique_ptr const event_base_; + std::unique_ptr const session_thread_; - // depends-on: event_base_ + // depends-on: session_thread_ std::unique_ptr const timer_maker_; /// static fields @@ -1031,16 +1048,17 @@ private: tr_session_id session_id_; - tr_bindinfo bind_ipv4_ = tr_bindinfo{ tr_inaddr_any }; - tr_bindinfo bind_ipv6_ = tr_bindinfo{ tr_in6addr_any }; + std::vector blocklists_; /// other fields - std::vector blocklists_; + // depends-on: session_thread_ + tr_bindinfo bind_ipv4_ = tr_bindinfo{ tr_inaddr_any }; + + // depends-on: session_thread_ + tr_bindinfo bind_ipv6_ = tr_bindinfo{ tr_in6addr_any }; public: - struct tr_event_handle* events = nullptr; - // depends-on: announcer_udp_ // FIXME(ckerr): circular dependency udp_core -> announcer_udp -> announcer_udp_mediator -> udp_core std::unique_ptr udp_core_; @@ -1056,7 +1074,7 @@ private: PortForwardingMediator port_forwarding_mediator_{ *this }; std::unique_ptr port_forwarding_ = tr_port_forwarding::create(port_forwarding_mediator_); - // depends-on: events, top_bandwidth_ + // depends-on: session_thread_, top_bandwidth_ AltSpeedMediator alt_speed_mediator_{ *this }; tr_session_alt_speeds alt_speeds_{ alt_speed_mediator_ }; @@ -1065,17 +1083,17 @@ private: // depends-on: open_files_ tr_torrents torrents_; - // depends-on: timer_maker_, top_bandwidth_, torrents_ - std::unique_ptr peer_mgr_; + // depends-on: settings_, session_thread_, torrents_ + WebMediator web_mediator_{ this }; + std::unique_ptr web_ = tr_web::create(this->web_mediator_); public: // depends-on: settings_, open_files_, torrents_ std::unique_ptr cache = std::make_unique(torrents_, 1024 * 1024 * 2); private: - // depends-on: settings_, events, torrents_ - WebMediator web_mediator_{ this }; - std::unique_ptr web_ = tr_web::create(this->web_mediator_); + // depends-on: timer_maker_, top_bandwidth_, torrents_, web_ + std::unique_ptr peer_mgr_; // depends-on: peer_mgr_, torrents_ LpdMediator lpd_mediator_{ *this }; @@ -1090,11 +1108,11 @@ public: // depends-on: announcer_udp_mediator_ std::unique_ptr announcer_udp_ = tr_announcer_udp::create(announcer_udp_mediator_); - // depends-on: settings_, torrents_, announcer_udp_ + // depends-on: settings_, torrents_, web_, announcer_udp_ struct tr_announcer* announcer = nullptr; private: - // depends-on: event_base_, timer_maker_, settings_, torrents_ + // depends-on: session_thread_, timer_maker_, settings_, torrents_, web_ std::unique_ptr rpc_server_; // depends-on: alt_speeds_, udp_core_, torrents_ diff --git a/libtransmission/timer-ev.cc b/libtransmission/timer-ev.cc index 13f346bc0..94be5b2b9 100644 --- a/libtransmission/timer-ev.cc +++ b/libtransmission/timer-ev.cc @@ -14,6 +14,23 @@ using namespace std::literals; +namespace +{ + +struct EventDeleter +{ + void operator()(struct event* event) + { + if (event != nullptr) + { + event_del(event); + event_free(event); + } + } +}; + +} // namespace + namespace libtransmission { @@ -31,15 +48,11 @@ public: EvTimer& operator=(EvTimer&&) = delete; EvTimer& operator=(EvTimer const&) = delete; - ~EvTimer() override - { - stop(); - event_free(evtimer_); - } + ~EvTimer() override = default; void stop() override { - evtimer_del(evtimer_); + evtimer_.reset(); } void start() override @@ -63,8 +76,7 @@ public: interval_ = interval; - // if evtimer_ is already running, update its interval - if (auto const is_pending = event_pending(evtimer_, EV_TIMEOUT, nullptr); is_pending != 0) + if (evtimer_) // update the timer if it's already running { restart(); } @@ -78,28 +90,20 @@ public: void setRepeating(bool repeating) override { is_repeating_ = repeating; - - if (evtimer_ != nullptr) - { - event_del(evtimer_); - event_free(evtimer_); - } - - evtimer_ = repeating ? event_new(base_, -1, EV_TIMEOUT | EV_PERSIST, onTimer, this) : - event_new(base_, -1, EV_TIMEOUT, onTimer, this); + evtimer_.reset(); } private: void restart() { - stop(); + evtimer_.reset(event_new(base_, -1, EV_TIMEOUT | (isRepeating() ? EV_PERSIST : 0), onTimer, this)); using namespace std::chrono; auto const secs = duration_cast(interval_); auto tv = timeval{}; tv.tv_sec = secs.count(); tv.tv_usec = static_cast(duration_cast(interval_ - secs).count()); - evtimer_add(evtimer_, &tv); + evtimer_add(evtimer_.get(), &tv); } static void onTimer(evutil_socket_t /*unused*/, short /*unused*/, void* vself) @@ -114,7 +118,7 @@ private: } struct event_base* const base_; - struct event* evtimer_ = nullptr; + std::unique_ptr evtimer_; std::function callback_; std::chrono::milliseconds interval_ = 100ms; diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc index 22bb03e52..4b815348b 100644 --- a/libtransmission/torrent.cc +++ b/libtransmission/torrent.cc @@ -48,7 +48,6 @@ #include "torrent-metainfo.h" #include "torrent.h" #include "tr-assert.h" -#include "trevent.h" /* tr_runInEventThread() */ #include "utils.h" #include "version.h" #include "web-utils.h" @@ -886,7 +885,7 @@ void tr_torrentManualUpdate(tr_torrent* tor) { TR_ASSERT(tr_isTorrent(tor)); - tr_runInEventThread(tor->session, tr_torrentManualUpdateImpl, tor); + tor->session->runInSessionThread(tr_torrentManualUpdateImpl, tor); } bool tr_torrentCanManualUpdate(tr_torrent const* tor) @@ -1384,7 +1383,7 @@ static void torrentStart(tr_torrent* tor, torrent_start_opts opts) tr_torrentUnsetPeerId(tor); tor->isRunning = true; tor->setDirty(); - tr_runInEventThread(tor->session, torrentStartImpl, tor); + tor->session->runInSessionThread(torrentStartImpl, tor); } void tr_torrentStart(tr_torrent* tor) @@ -1408,7 +1407,7 @@ void tr_torrentStartNow(tr_torrent* tor) static void onVerifyDoneThreadFunc(tr_torrent* const tor) { - TR_ASSERT(tr_amInEventThread(tor->session)); + TR_ASSERT(tor->session->amInSessionThread()); if (tor->isDeleting) { @@ -1434,12 +1433,12 @@ void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted) return; } - tr_runInEventThread(tor->session, onVerifyDoneThreadFunc, tor); + tor->session->runInSessionThread(onVerifyDoneThreadFunc, tor); } static void verifyTorrent(tr_torrent* const tor) { - TR_ASSERT(tr_amInEventThread(tor->session)); + TR_ASSERT(tor->session->amInSessionThread()); auto const lock = tor->unique_lock(); if (tor->isDeleting) @@ -1470,7 +1469,7 @@ static void verifyTorrent(tr_torrent* const tor) void tr_torrentVerify(tr_torrent* tor) { - tr_runInEventThread(tor->session, verifyTorrent, tor); + tor->session->runInSessionThread(verifyTorrent, tor); } void tr_torrentSave(tr_torrent* tor) @@ -1487,7 +1486,7 @@ void tr_torrentSave(tr_torrent* tor) static void stopTorrent(tr_torrent* const tor) { TR_ASSERT(tr_isTorrent(tor)); - TR_ASSERT(tr_amInEventThread(tor->session)); + TR_ASSERT(tor->session->amInSessionThread()); auto const lock = tor->unique_lock(); if (!tor->session->isClosing()) @@ -1532,13 +1531,14 @@ void tr_torrentStop(tr_torrent* tor) tor->isRunning = false; tor->isStopping = false; tor->setDirty(); - tr_runInEventThread(tor->session, stopTorrent, tor); + tor->session->runInSessionThread(stopTorrent, tor); } -static void closeTorrent(tr_torrent* const tor) +void tr_torrentFreeInSessionThread(tr_torrent* tor) { TR_ASSERT(tr_isTorrent(tor)); - TR_ASSERT(tr_amInEventThread(tor->session)); + TR_ASSERT(tor->session != nullptr); + TR_ASSERT(tor->session->amInSessionThread()); if (!tor->session->isClosing()) { @@ -1559,26 +1559,12 @@ static void closeTorrent(tr_torrent* const tor) freeTorrent(tor); } -void tr_torrentFree(tr_torrent* tor) -{ - if (tr_isTorrent(tor)) - { - tr_session* session = tor->session; - - TR_ASSERT(session != nullptr); - - auto const lock = tor->unique_lock(); - - tr_runInEventThread(session, closeTorrent, tor); - } -} - static bool removeTorrentFile(char const* filename, void* /*user_data*/, tr_error** error) { return tr_sys_path_remove(filename, error); } -static void removeTorrentInEventThread(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data) +static void removeTorrentInSessionThread(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data) { auto const lock = tor->unique_lock(); @@ -1600,7 +1586,7 @@ static void removeTorrentInEventThread(tr_torrent* tor, bool delete_flag, tr_fil tor->metainfo_.files().remove(tor->currentDir(), tor->name(), delete_func_wrapper); } - closeTorrent(tor); + tr_torrentFreeInSessionThread(tor); } void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data) @@ -1609,7 +1595,7 @@ void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func tor->isDeleting = true; - tr_runInEventThread(tor->session, removeTorrentInEventThread, tor, delete_flag, delete_func, user_data); + tor->session->runInSessionThread(removeTorrentInSessionThread, tor, delete_flag, delete_func, user_data); } /** @@ -2059,7 +2045,7 @@ uint64_t tr_torrentGetBytesLeftToAllocate(tr_torrent const* tor) /// -static void setLocationInEventThread( +static void setLocationInSessionThread( tr_torrent* tor, std::string const& path, bool move_from_old_path, @@ -2067,7 +2053,7 @@ static void setLocationInEventThread( int volatile* setme_state) { TR_ASSERT(tr_isTorrent(tor)); - TR_ASSERT(tr_amInEventThread(tor->session)); + TR_ASSERT(tor->session->amInSessionThread()); auto ok = bool{ true }; if (move_from_old_path) @@ -2125,9 +2111,8 @@ void tr_torrent::setLocation( *setme_state = TR_LOC_MOVING; } - tr_runInEventThread( - this->session, - setLocationInEventThread, + this->session->runInSessionThread( + setLocationInSessionThread, this, std::string{ location }, move_from_old_path, @@ -2236,7 +2221,7 @@ static void tr_torrentPieceCompleted(tr_torrent* tor, tr_piece_index_t piece_ind void tr_torrentGotBlock(tr_torrent* tor, tr_block_index_t block) { TR_ASSERT(tr_isTorrent(tor)); - TR_ASSERT(tr_amInEventThread(tor->session)); + TR_ASSERT(tor->session->amInSessionThread()); bool const block_is_new = !tor->hasBlock(block); @@ -2626,8 +2611,7 @@ void tr_torrent::renamePath( tr_torrent_rename_done_func callback, void* callback_user_data) { - tr_runInEventThread( - this->session, + this->session->runInSessionThread( torrentRenamePath, this, std::string{ oldpath }, diff --git a/libtransmission/torrent.h b/libtransmission/torrent.h index a39828a79..62f88de0f 100644 --- a/libtransmission/torrent.h +++ b/libtransmission/torrent.h @@ -40,10 +40,10 @@ struct tr_torrent; struct tr_torrent_announcer; /** -*** Package-visible ctor API +*** Package-visible **/ -void tr_torrentFree(tr_torrent* tor); +void tr_torrentFreeInSessionThread(tr_torrent* tor); void tr_ctorInitTorrentPriorities(tr_ctor const* ctor, tr_torrent* tor); diff --git a/libtransmission/trevent.cc b/libtransmission/trevent.cc deleted file mode 100644 index 1271ea760..000000000 --- a/libtransmission/trevent.cc +++ /dev/null @@ -1,275 +0,0 @@ -// This file Copyright © 2007-2022 Mnemosyne LLC. -// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), -// or any future license endorsed by Mnemosyne LLC. -// License text can be found in the licenses/ folder. - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#ifdef _WIN32 -#include -#endif - -#include -#include - -#include "transmission.h" - -#include "log.h" -#include "net.h" -#include "session.h" -#include "tr-assert.h" -#include "trevent.h" -#include "utils.h" - -/*** -**** -***/ - -namespace -{ -namespace tr_evthread_init_helpers -{ -void* lock_alloc(unsigned /*locktype*/) -{ - return new std::recursive_mutex{}; -} - -void lock_free(void* vlock, unsigned /*locktype*/) -{ - delete static_cast(vlock); -} - -int lock_lock(unsigned mode, void* vlock) -{ - auto* lock = static_cast(vlock); - if ((mode & EVTHREAD_TRY) != 0U) - { - auto const success = lock->try_lock(); - return success ? 0 : -1; - } - lock->lock(); - return 0; -} - -int lock_unlock(unsigned /*mode*/, void* vlock) -{ - static_cast(vlock)->unlock(); - return 0; -} - -void* cond_alloc(unsigned /*condflags*/) -{ - return new std::condition_variable_any(); -} - -void cond_free(void* vcond) -{ - delete static_cast(vcond); -} - -int cond_signal(void* vcond, int broadcast) -{ - auto* cond = static_cast(vcond); - if (broadcast != 0) - { - cond->notify_all(); - } - else - { - cond->notify_one(); - } - return 0; -} - -int cond_wait(void* vcond, void* vlock, struct timeval const* tv) -{ - auto* cond = static_cast(vcond); - auto* lock = static_cast(vlock); - if (tv == nullptr) - { - cond->wait(*lock); - return 0; - } - - auto const duration = std::chrono::seconds(tv->tv_sec) + std::chrono::microseconds(tv->tv_usec); - auto const success = cond->wait_for(*lock, duration); - return success == std::cv_status::timeout ? 1 : 0; -} - -unsigned long thread_current_id() -{ - return std::hash()(std::this_thread::get_id()); -} - -auto evthread_flag = std::once_flag{}; - -void initEvthreadsOnce() -{ - tr_net_init(); - - evthread_lock_callbacks constexpr LockCbs{ - EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock - }; - evthread_set_lock_callbacks(&LockCbs); - - evthread_condition_callbacks constexpr CondCbs{ EVTHREAD_CONDITION_API_VERSION, - cond_alloc, - cond_free, - cond_signal, - cond_wait }; - evthread_set_condition_callbacks(&CondCbs); - - evthread_set_id_callback(thread_current_id); -} - -} // namespace tr_evthread_init_helpers -} // namespace - -void tr_evthread_init() -{ - using namespace tr_evthread_init_helpers; - std::call_once(evthread_flag, initEvthreadsOnce); -} - -/*** -**** -***/ - -struct tr_event_handle -{ - using callback = std::function; - - using work_queue_t = std::list; - work_queue_t work_queue; - std::condition_variable work_queue_cv; - std::mutex work_queue_mutex; - event* work_queue_event = nullptr; - - tr_session* session = nullptr; - std::thread::id thread_id; -}; - -static void onWorkAvailable(evutil_socket_t /*fd*/, short /*flags*/, void* vsession) -{ - // invariant - auto* const session = static_cast(vsession); - TR_ASSERT(tr_amInEventThread(session)); - - // steal the work queue - auto* events = session->events; - auto work_queue_lock = std::unique_lock(events->work_queue_mutex); - auto work_queue = tr_event_handle::work_queue_t{}; - std::swap(work_queue, events->work_queue); - work_queue_lock.unlock(); - - // process the work queue - for (auto const& func : work_queue) - { - func(); - } -} - -static void libeventThreadFunc(tr_event_handle* events) -{ -#ifndef _WIN32 - /* Don't exit when writing on a broken socket */ - (void)signal(SIGPIPE, SIG_IGN); -#endif - - tr_evthread_init(); - - // create the libevent base - auto* base = events->session->eventBase(); - - // initialize the session struct's event fields - events->work_queue_event = event_new(base, -1, 0, onWorkAvailable, events->session); - events->session->events = events; - - // tell the thread that's waiting in tr_eventInit() - // that this thread is ready for business - events->work_queue_cv.notify_one(); - - // loop until `tr_eventClose()` kills the loop - event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY); - - // shut down the thread - event_free(events->work_queue_event); - events->session->events = nullptr; - delete events; - tr_logAddTrace("Closing libevent thread"); -} - -void tr_eventInit(tr_session* session) -{ - session->events = nullptr; - - auto* const events = new tr_event_handle(); - events->session = session; - - auto lock = std::unique_lock(events->work_queue_mutex); - auto thread = std::thread(libeventThreadFunc, events); - events->thread_id = thread.get_id(); - thread.detach(); - // wait until the libevent thread is running - events->work_queue_cv.wait(lock, [session] { return session->events != nullptr; }); -} - -void tr_eventClose(tr_session* session) -{ - TR_ASSERT(session != nullptr); - - auto* events = session->events; - if (events == nullptr) - { - return; - } - - event_base_loopexit(session->eventBase(), nullptr); - - tr_logAddTrace("closing trevent pipe"); -} - -/** -*** -**/ - -bool tr_amInEventThread(tr_session const* session) -{ - TR_ASSERT(session != nullptr); - TR_ASSERT(session->events != nullptr); - - return std::this_thread::get_id() == session->events->thread_id; -} - -/** -*** -**/ - -void tr_runInEventThread(tr_session* session, std::function&& func) -{ - TR_ASSERT(session != nullptr); - auto* events = session->events; - TR_ASSERT(events != nullptr); - - if (tr_amInEventThread(session)) - { - func(); - } - else - { - auto lock = std::unique_lock(events->work_queue_mutex); - events->work_queue.emplace_back(std::move(func)); - lock.unlock(); - - event_active(events->work_queue_event, 0, {}); - } -} diff --git a/libtransmission/trevent.h b/libtransmission/trevent.h deleted file mode 100644 index 74878bf24..000000000 --- a/libtransmission/trevent.h +++ /dev/null @@ -1,39 +0,0 @@ -// This file Copyright © 2007-2022 Mnemosyne LLC. -// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), -// or any future license endorsed by Mnemosyne LLC. -// License text can be found in the licenses/ folder. - -#pragma once - -#ifndef __TRANSMISSION__ -#error only libtransmission should #include this header. -#endif - -#include -#include -#include - -#include "tr-macros.h" - -struct tr_session; - -void tr_evthread_init(); - -void tr_eventInit(tr_session* session); - -void tr_eventClose(tr_session* session); - -bool tr_amInEventThread(tr_session const* session); - -void tr_runInEventThread(tr_session* session, std::function&& func); - -template -void tr_runInEventThread(tr_session* session, Func&& func, Args&&... args) -{ - tr_runInEventThread( - session, - std::function{ [func = std::forward(func), args = std::make_tuple(std::forward(args)...)]() - { - std::apply(std::move(func), std::move(args)); - } }); -} diff --git a/libtransmission/web.cc b/libtransmission/web.cc index 70530cf9d..8c14f0905 100644 --- a/libtransmission/web.cc +++ b/libtransmission/web.cc @@ -154,7 +154,7 @@ public: curl_thread->join(); } - void closeSoon() + void startShutdown() { run_mode = RunMode::CloseSoon; queued_tasks_cv.notify_one(); @@ -635,7 +635,7 @@ bool tr_web::isClosed() const noexcept return impl_->isClosed(); } -void tr_web::closeSoon() +void tr_web::startShutdown() { - impl_->closeSoon(); + impl_->startShutdown(); } diff --git a/libtransmission/web.h b/libtransmission/web.h index 08fb702f9..53e148a6e 100644 --- a/libtransmission/web.h +++ b/libtransmission/web.h @@ -91,7 +91,7 @@ public: // Notify tr_web that it's going to be destroyed soon. // New fetch() tasks will be rejected, but already-running tasks // are left alone so that they can finish. - void closeSoon(); + void startShutdown(); // True when tr_web is ready to be destroyed. // Will never be true until after closeSoon() is called. diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index 0403b0d50..e1aae1951 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -25,7 +25,6 @@ #include "peer-mgr.h" #include "timer.h" #include "torrent.h" -#include "trevent.h" // tr_runInEventThread() #include "utils.h" #include "web-utils.h" #include "web.h" @@ -396,7 +395,7 @@ void useFetchedBlocks(tr_webseed_task* task) block_buf->resize(block_size); evbuffer_remove(task->content(), std::data(*block_buf), std::size(*block_buf)); auto* const data = new write_block_data{ session, tor->id(), task->loc.block, block_buf, webseed }; - tr_runInEventThread(session, &write_block_data::write_block_func, data); + session->runInSessionThread(&write_block_data::write_block_func, data); } task->loc = tor->byteLoc(task->loc.byte + block_size); diff --git a/tests/libtransmission/move-test.cc b/tests/libtransmission/move-test.cc index cac458b88..91da61b24 100644 --- a/tests/libtransmission/move-test.cc +++ b/tests/libtransmission/move-test.cc @@ -105,7 +105,7 @@ TEST_P(IncompleteDirTest, incompleteDir) data.buf = std::make_unique>(tr_block_info::BlockSize, '\0'); data.block = block_index; data.done = false; - tr_runInEventThread(session_, test_incomplete_dir_threadfunc, &data); + session_->runInSessionThread(test_incomplete_dir_threadfunc, &data); auto const test = [&data]() { diff --git a/tests/libtransmission/test-fixtures.h b/tests/libtransmission/test-fixtures.h index bc225a1d5..c924ad86f 100644 --- a/tests/libtransmission/test-fixtures.h +++ b/tests/libtransmission/test-fixtures.h @@ -23,7 +23,6 @@ #include "platform.h" // TR_PATH_DELIMITER #include "quark.h" #include "torrent.h" -#include "trevent.h" // tr_amInEventThread() #include "utils.h" #include "variant.h" @@ -490,7 +489,7 @@ protected: void blockingTorrentVerify(tr_torrent* tor) const { EXPECT_NE(nullptr, tor->session); - EXPECT_FALSE(tr_amInEventThread(tor->session)); + EXPECT_FALSE(tor->session->amInSessionThread()); auto const n_previously_verified = std::size(verified_); tr_torrentVerify(tor); waitFor(