// This file Copyright © 2007-2022 Mnemosyne LLC. // It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), // or any future license endorsed by Mnemosyne LLC. // License text can be found in the licenses/ folder. #include #include #include #include #include #include #include #include // for std::move(), std::swap() #include #ifdef _WIN32 #include #endif #include #include #include "transmission.h" #include "log.h" #include "session-thread.h" #include "tr-assert.h" #include "utils.h" // for tr_net_init() using namespace std::literals; /// namespace { namespace tr_evthread_init_helpers { void* lock_alloc(unsigned /*locktype*/) { return new std::recursive_mutex{}; } void lock_free(void* vlock, unsigned /*locktype*/) { delete static_cast(vlock); } int lock_lock(unsigned mode, void* vlock) { auto* lock = static_cast(vlock); if ((mode & EVTHREAD_TRY) != 0U) { auto const success = lock->try_lock(); return success ? 0 : -1; } lock->lock(); return 0; } int lock_unlock(unsigned /*mode*/, void* vlock) { static_cast(vlock)->unlock(); return 0; } void* cond_alloc(unsigned /*condflags*/) { return new std::condition_variable_any(); } void cond_free(void* vcond) { delete static_cast(vcond); } int cond_signal(void* vcond, int broadcast) { auto* cond = static_cast(vcond); if (broadcast != 0) { cond->notify_all(); } else { cond->notify_one(); } return 0; } int cond_wait(void* vcond, void* vlock, struct timeval const* tv) { auto* cond = static_cast(vcond); auto* lock = static_cast(vlock); if (tv == nullptr) { cond->wait(*lock); return 0; } auto const duration = std::chrono::seconds(tv->tv_sec) + std::chrono::microseconds(tv->tv_usec); auto const success = cond->wait_for(*lock, duration); return success == std::cv_status::timeout ? 1 : 0; } unsigned long thread_current_id() { return std::hash()(std::this_thread::get_id()); } auto evthread_flag = std::once_flag{}; void initEvthreadsOnce() { tr_net_init(); evthread_lock_callbacks constexpr LockCbs{ EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock }; evthread_set_lock_callbacks(&LockCbs); evthread_condition_callbacks constexpr CondCbs{ EVTHREAD_CONDITION_API_VERSION, cond_alloc, cond_free, cond_signal, cond_wait }; evthread_set_condition_callbacks(&CondCbs); evthread_set_id_callback(thread_current_id); } } // namespace tr_evthread_init_helpers auto makeEventBase() { tr_session_thread::tr_evthread_init(); return std::unique_ptr{ event_base_new(), event_base_free }; } } // namespace /// void tr_session_thread::tr_evthread_init() { using namespace tr_evthread_init_helpers; std::call_once(evthread_flag, initEvthreadsOnce); } class tr_session_thread_impl final : public tr_session_thread { public: explicit tr_session_thread_impl() : evbase_{ makeEventBase() } , work_queue_event_{ event_new(evbase_.get(), -1, 0, onWorkAvailableStatic, this), event_free } { auto lock = std::unique_lock(is_looping_mutex_); thread_ = std::thread(&tr_session_thread_impl::sessionThreadFunc, this, eventBase()); thread_id_ = thread_.get_id(); // wait for the session thread's main loop to start is_looping_cv_.wait(lock, [this]() { return is_looping_; }); } tr_session_thread_impl(tr_session_thread_impl&&) = delete; tr_session_thread_impl(tr_session_thread_impl const&) = delete; tr_session_thread_impl& operator=(tr_session_thread_impl&&) = delete; tr_session_thread_impl& operator=(tr_session_thread_impl const&) = delete; ~tr_session_thread_impl() override { TR_ASSERT(!amInSessionThread()); TR_ASSERT(is_looping_); // Stop the first event loop. This is the steady-state loop that runs // continuously, even when there are no events. See: sessionThreadFunc() is_shutting_down_ = true; event_base_loopexit(eventBase(), nullptr); // Wait on the second event loop. This is the shutdown loop that exits // as soon as there are no events. This step is to give pending tasks // a chance to finish. auto lock = std::unique_lock(is_looping_mutex_); is_looping_cv_.wait_for(lock, Deadline, [this]() { return !is_looping_; }); event_base_loopexit(eventBase(), nullptr); thread_.join(); } [[nodiscard]] struct event_base* eventBase() noexcept override { return evbase_.get(); } [[nodiscard]] bool amInSessionThread() const noexcept override { return thread_id_ == std::this_thread::get_id(); } void run(std::function&& func) override { if (amInSessionThread()) { func(); } else { work_queue_mutex_.lock(); work_queue_.emplace_back(std::move(func)); work_queue_mutex_.unlock(); event_active(work_queue_event_.get(), 0, {}); } } private: using callback = std::function; using work_queue_t = std::list; void sessionThreadFunc(struct event_base* evbase) { #ifndef _WIN32 /* Don't exit when writing on a broken socket */ (void)signal(SIGPIPE, SIG_IGN); #endif tr_evthread_init(); constexpr auto ToggleLooping = [](evutil_socket_t, short, void* vself) { auto* const self = static_cast(vself); self->is_looping_mutex_.lock(); self->is_looping_ = !self->is_looping_; self->is_looping_mutex_.unlock(); self->is_looping_cv_.notify_one(); }; event_base_once(evbase, -1, EV_TIMEOUT, ToggleLooping, this, nullptr); // Start the first event loop. This is the steady-state loop that runs // continuously until `this` is destroyed. See: ~tr_session_thread_impl() TR_ASSERT(!is_shutting_down_); event_base_loop(evbase, EVLOOP_NO_EXIT_ON_EMPTY); // Start the second event loop. This is the shutdown loop that exits as // soon as there are no events. It's used to give any remaining events // a chance to finish up before we exit. TR_ASSERT(is_shutting_down_); event_base_loop(evbase, 0); ToggleLooping({}, {}, this); } static void onWorkAvailableStatic(evutil_socket_t /*fd*/, short /*flags*/, void* vself) { static_cast(vself)->onWorkAvailable(); } void onWorkAvailable() { TR_ASSERT(amInSessionThread()); // steal the work queue auto work_queue_lock = std::unique_lock(work_queue_mutex_); auto work_queue = work_queue_t{}; std::swap(work_queue, work_queue_); work_queue_lock.unlock(); // process the work queue for (auto const& func : work_queue) { func(); } } std::unique_ptr const evbase_; std::unique_ptr const work_queue_event_; work_queue_t work_queue_; std::mutex work_queue_mutex_; std::thread thread_; std::thread::id thread_id_; std::mutex is_looping_mutex_; std::condition_variable is_looping_cv_; bool is_looping_ = false; bool is_shutting_down_ = false; static constexpr std::chrono::seconds Deadline = 5s; }; std::unique_ptr tr_session_thread::create() { return std::make_unique(); }