refactor: make `tr_session_thread` a unique_ptr owned by tr_session (#4069)

This commit is contained in:
Charles Kerr 2022-11-04 16:20:27 -05:00 committed by GitHub
parent d5cc43355c
commit 831eb8d40f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 504 additions and 554 deletions

View File

@ -153,8 +153,8 @@
A242AD9315F05D23002B3A6C /* Localizable.strings in Resources */ = {isa = PBXBuildFile; fileRef = A242AD9115F05D23002B3A6C /* Localizable.strings */; };
A2451E6916ACE4EB00586E0E /* FileRenameSheetController.mm in Sources */ = {isa = PBXBuildFile; fileRef = A2451E6716ACE4EB00586E0E /* FileRenameSheetController.mm */; };
A2451E6A16ACE4EB00586E0E /* FileRenameSheetController.xib in Resources */ = {isa = PBXBuildFile; fileRef = A2451E6816ACE4EB00586E0E /* FileRenameSheetController.xib */; };
A24621410C769D0900088E81 /* trevent.h in Headers */ = {isa = PBXBuildFile; fileRef = A24621350C769CF400088E81 /* trevent.h */; };
A24621420C769D0900088E81 /* trevent.cc in Sources */ = {isa = PBXBuildFile; fileRef = A24621360C769CF400088E81 /* trevent.cc */; };
A24621410C769D0900088E81 /* session-thread.h in Headers */ = {isa = PBXBuildFile; fileRef = A24621350C769CF400088E81 /* session-thread.h */; };
A24621420C769D0900088E81 /* session-thread.cc in Sources */ = {isa = PBXBuildFile; fileRef = A24621360C769CF400088E81 /* session-thread.cc */; };
A247A443114C701800547DFC /* InfoViewController.h in Headers */ = {isa = PBXBuildFile; fileRef = A247A442114C701800547DFC /* InfoViewController.h */; };
A24F19080A3A790800C9C145 /* Sparkle.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = A24F19070A3A790800C9C145 /* Sparkle.framework */; settings = {ATTRIBUTES = (Weak, ); }; };
A24F19210A3A796800C9C145 /* Sparkle.framework in Copy Files */ = {isa = PBXBuildFile; fileRef = A24F19070A3A790800C9C145 /* Sparkle.framework */; settings = {ATTRIBUTES = (CodeSignOnCopy, ); }; };
@ -874,8 +874,8 @@
A2451E6616ACE4EB00586E0E /* FileRenameSheetController.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = FileRenameSheetController.h; sourceTree = "<group>"; };
A2451E6716ACE4EB00586E0E /* FileRenameSheetController.mm */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.objcpp; path = FileRenameSheetController.mm; sourceTree = "<group>"; };
A2451E6816ACE4EB00586E0E /* FileRenameSheetController.xib */ = {isa = PBXFileReference; lastKnownFileType = file.xib; path = FileRenameSheetController.xib; sourceTree = "<group>"; };
A24621350C769CF400088E81 /* trevent.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = trevent.h; sourceTree = "<group>"; };
A24621360C769CF400088E81 /* trevent.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = trevent.cc; sourceTree = "<group>"; };
A24621350C769CF400088E81 /* session-thread.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = session-thread.h; sourceTree = "<group>"; };
A24621360C769CF400088E81 /* session-thread.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = session-thread.cc; sourceTree = "<group>"; };
A247A442114C701800547DFC /* InfoViewController.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = InfoViewController.h; sourceTree = "<group>"; };
A24F19070A3A790800C9C145 /* Sparkle.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = Sparkle.framework; sourceTree = "<group>"; };
A25485390EB66CBB004539DA /* codelength.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = codelength.h; sourceTree = "<group>"; };
@ -1730,8 +1730,8 @@
A2679292130E00A000CB7464 /* tr-utp.cc */,
A2679293130E00A000CB7464 /* tr-utp.h */,
BEFC1DF50C07861A00B0BB3C /* transmission.h */,
A24621360C769CF400088E81 /* trevent.cc */,
A24621350C769CF400088E81 /* trevent.h */,
A24621360C769CF400088E81 /* session-thread.cc */,
A24621350C769CF400088E81 /* session-thread.h */,
BEFC1DF40C07861A00B0BB3C /* port-forwarding-upnp.cc */,
BEFC1DF30C07861A00B0BB3C /* port-forwarding-upnp.h */,
BEFC1DF20C07861A00B0BB3C /* utils.cc */,
@ -2176,7 +2176,7 @@
BEFC1E550C07861A00B0BB3C /* completion.h in Headers */,
BEFC1E570C07861A00B0BB3C /* clients.h in Headers */,
A2BE9C530C1E4AF7002D16E6 /* makemeta.h in Headers */,
A24621410C769D0900088E81 /* trevent.h in Headers */,
A24621410C769D0900088E81 /* session-thread.h in Headers */,
4D36BA700CA2F00800A63CA5 /* peer-mse.h in Headers */,
C10C644E1D9AF328003C1B4C /* session-id.h in Headers */,
4D36BA730CA2F00800A63CA5 /* handshake.h in Headers */,
@ -2904,7 +2904,7 @@
BEFC1E560C07861A00B0BB3C /* completion.cc in Sources */,
BEFC1E580C07861A00B0BB3C /* clients.cc in Sources */,
A2BE9C520C1E4AF5002D16E6 /* makemeta.cc in Sources */,
A24621420C769D0900088E81 /* trevent.cc in Sources */,
A24621420C769D0900088E81 /* session-thread.cc in Sources */,
C11DEA161FCD31C0009E22B9 /* subprocess-posix.cc in Sources */,
4D36BA6F0CA2F00800A63CA5 /* peer-mse.cc in Sources */,
4D36BA720CA2F00800A63CA5 /* handshake.cc in Sources */,

View File

@ -53,6 +53,7 @@ set(PROJECT_FILES
session-id.cc
session-alt-speeds.cc
session-settings.cc
session-thread.cc
session.cc
stats.cc
subprocess-posix.cc
@ -71,7 +72,6 @@ set(PROJECT_FILES
tr-lpd.cc
tr-udp.cc
tr-utp.cc
trevent.cc
utils.cc
variant-benc.cc
variant-json.cc
@ -197,6 +197,7 @@ set(${PROJECT_NAME}_PRIVATE_HEADERS
resume.h
rpc-server.h
session-alt-speeds.h
session-thread.h
session.h
stats.h
subprocess.h
@ -208,7 +209,6 @@ set(${PROJECT_NAME}_PRIVATE_HEADERS
tr-dht.h
tr-lpd.h
tr-utp.h
trevent.h
variant-common.h
verify.h
version.h

View File

@ -480,7 +480,6 @@ std::shared_ptr<tr_peerIo> tr_peerIo::create(
struct tr_peer_socket const socket)
{
TR_ASSERT(session != nullptr);
TR_ASSERT(session->events != nullptr);
auto lock = session->unique_lock();
TR_ASSERT(isSupportedSocket(socket));
@ -596,7 +595,6 @@ std::shared_ptr<tr_peerIo> tr_peerIo::newOutgoing(
static void event_enable(tr_peerIo* io, short event)
{
TR_ASSERT(io->session != nullptr);
TR_ASSERT(io->session->events != nullptr);
bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
@ -633,8 +631,6 @@ static void event_enable(tr_peerIo* io, short event)
static void event_disable(tr_peerIo* io, short event)
{
TR_ASSERT(io->session->events != nullptr);
bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
if (need_events)
@ -730,7 +726,6 @@ static void io_close_socket(tr_peerIo* io)
tr_peerIo::~tr_peerIo()
{
auto const lock = session->unique_lock();
TR_ASSERT(session->events != nullptr);
clearCallbacks();
tr_logAddTraceIo(this, "in tr_peerIo destructor");

View File

@ -565,11 +565,11 @@ struct tr_peerMgr
explicit tr_peerMgr(tr_session* session_in)
: session{ session_in }
, bandwidth_timer_{ session->timerMaker().create([this]() { bandwidthPulse(); }) }
, rechoke_timer_{ session->timerMaker().create([this]() { rechokePulseMarshall(); }) }
, rechoke_timer_{ session->timerMaker().create([this]() { rechokePulse(); }) }
, refill_upkeep_timer_{ session->timerMaker().create([this]() { refillUpkeep(); }) }
{
bandwidth_timer_->startRepeating(BandwidthPeriod);
rechoke_timer_->startSingleShot(RechokePeriod);
rechoke_timer_->startRepeating(RechokePeriod);
refill_upkeep_timer_->startRepeating(RefillUpkeepPeriod);
}
@ -604,12 +604,6 @@ struct tr_peerMgr
Handshakes incoming_handshakes;
private:
void rechokePulseMarshall()
{
rechokePulse();
rechoke_timer_->startSingleShot(RechokePeriod);
}
std::unique_ptr<libtransmission::Timer> const bandwidth_timer_;
std::unique_ptr<libtransmission::Timer> const rechoke_timer_;
std::unique_ptr<libtransmission::Timer> const refill_upkeep_timer_;

View File

@ -45,7 +45,6 @@
#include "timer.h"
#include "tr-assert.h"
#include "tr-strbuf.h"
#include "trevent.h"
#include "utils.h"
#include "variant.h"
#include "web-utils.h"
@ -755,8 +754,7 @@ void tr_rpc_server::setEnabled(bool is_enabled)
{
is_enabled_ = is_enabled;
tr_runInEventThread(
this->session,
session->runInSessionThread(
[this]()
{
if (!is_enabled_)
@ -790,7 +788,7 @@ void tr_rpc_server::setPort(tr_port port) noexcept
if (isEnabled())
{
tr_runInEventThread(session, restartServer, this);
session->runInSessionThread(&restartServer, this);
}
}
@ -931,7 +929,7 @@ void tr_rpc_server::load(tr_variant* src)
{
auto const rpc_uri = tr_rpc_address_with_port(this) + this->url_;
tr_logAddInfo(fmt::format(_("Serving RPC and Web requests on {address}"), fmt::arg("address", rpc_uri)));
tr_runInEventThread(session, startServer, this);
session->runInSessionThread(startServer, this);
if (this->isWhitelistEnabled())
{

View File

@ -0,0 +1,296 @@
// This file Copyright © 2007-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <chrono>
#include <condition_variable>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility> // for std::move(), std::swap()
#include <csignal>
#ifdef _WIN32
#include <winsock2.h>
#endif
#include <event2/event.h>
#include <event2/thread.h>
#include "transmission.h"
#include "log.h"
#include "session-thread.h"
#include "tr-assert.h"
#include "utils.h" // for tr_net_init()
using namespace std::literals;
///
namespace
{
namespace tr_evthread_init_helpers
{
void* lock_alloc(unsigned /*locktype*/)
{
return new std::recursive_mutex{};
}
void lock_free(void* vlock, unsigned /*locktype*/)
{
delete static_cast<std::recursive_mutex*>(vlock);
}
int lock_lock(unsigned mode, void* vlock)
{
auto* lock = static_cast<std::recursive_mutex*>(vlock);
if ((mode & EVTHREAD_TRY) != 0U)
{
auto const success = lock->try_lock();
return success ? 0 : -1;
}
lock->lock();
return 0;
}
int lock_unlock(unsigned /*mode*/, void* vlock)
{
static_cast<std::recursive_mutex*>(vlock)->unlock();
return 0;
}
void* cond_alloc(unsigned /*condflags*/)
{
return new std::condition_variable_any();
}
void cond_free(void* vcond)
{
delete static_cast<std::condition_variable_any*>(vcond);
}
int cond_signal(void* vcond, int broadcast)
{
auto* cond = static_cast<std::condition_variable_any*>(vcond);
if (broadcast != 0)
{
cond->notify_all();
}
else
{
cond->notify_one();
}
return 0;
}
int cond_wait(void* vcond, void* vlock, struct timeval const* tv)
{
auto* cond = static_cast<std::condition_variable_any*>(vcond);
auto* lock = static_cast<std::recursive_mutex*>(vlock);
if (tv == nullptr)
{
cond->wait(*lock);
return 0;
}
auto const duration = std::chrono::seconds(tv->tv_sec) + std::chrono::microseconds(tv->tv_usec);
auto const success = cond->wait_for(*lock, duration);
return success == std::cv_status::timeout ? 1 : 0;
}
unsigned long thread_current_id()
{
return std::hash<std::thread::id>()(std::this_thread::get_id());
}
auto evthread_flag = std::once_flag{};
void initEvthreadsOnce()
{
tr_net_init();
evthread_lock_callbacks constexpr LockCbs{
EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock
};
evthread_set_lock_callbacks(&LockCbs);
evthread_condition_callbacks constexpr CondCbs{ EVTHREAD_CONDITION_API_VERSION,
cond_alloc,
cond_free,
cond_signal,
cond_wait };
evthread_set_condition_callbacks(&CondCbs);
evthread_set_id_callback(thread_current_id);
}
} // namespace tr_evthread_init_helpers
auto makeEventBase()
{
tr_session_thread::tr_evthread_init();
return std::unique_ptr<event_base, void (*)(event_base*)>{ event_base_new(), event_base_free };
}
} // namespace
///
void tr_session_thread::tr_evthread_init()
{
using namespace tr_evthread_init_helpers;
std::call_once(evthread_flag, initEvthreadsOnce);
}
class tr_session_thread_impl final : public tr_session_thread
{
public:
explicit tr_session_thread_impl()
: evbase_{ makeEventBase() }
, work_queue_event_{ event_new(evbase_.get(), -1, 0, onWorkAvailableStatic, this), event_free }
{
auto lock = std::unique_lock(is_looping_mutex_);
thread_ = std::thread(&tr_session_thread_impl::sessionThreadFunc, this, eventBase());
thread_id_ = thread_.get_id();
// wait for the session thread's main loop to start
is_looping_cv_.wait(lock, [this]() { return is_looping_; });
}
tr_session_thread_impl(tr_session_thread_impl&&) = delete;
tr_session_thread_impl(tr_session_thread_impl const&) = delete;
tr_session_thread_impl& operator=(tr_session_thread_impl&&) = delete;
tr_session_thread_impl& operator=(tr_session_thread_impl const&) = delete;
~tr_session_thread_impl() override
{
TR_ASSERT(!amInSessionThread());
TR_ASSERT(is_looping_);
// Stop the first event loop. This is the steady-state loop that runs
// continuously, even when there are no events. See: sessionThreadFunc()
is_shutting_down_ = true;
event_base_loopexit(eventBase(), nullptr);
// Wait on the second event loop. This is the shutdown loop that exits
// as soon as there are no events. This step is to give pending tasks
// a chance to finish.
auto lock = std::unique_lock(is_looping_mutex_);
is_looping_cv_.wait_for(lock, Deadline, [this]() { return !is_looping_; });
event_base_loopexit(eventBase(), nullptr);
thread_.join();
}
[[nodiscard]] struct event_base* eventBase() noexcept override
{
return evbase_.get();
}
[[nodiscard]] bool amInSessionThread() const noexcept override
{
return thread_id_ == std::this_thread::get_id();
}
void run(std::function<void(void)>&& func) override
{
if (amInSessionThread())
{
func();
}
else
{
work_queue_mutex_.lock();
work_queue_.emplace_back(std::move(func));
work_queue_mutex_.unlock();
event_active(work_queue_event_.get(), 0, {});
}
}
private:
using callback = std::function<void(void)>;
using work_queue_t = std::list<callback>;
void sessionThreadFunc(struct event_base* evbase)
{
#ifndef _WIN32
/* Don't exit when writing on a broken socket */
(void)signal(SIGPIPE, SIG_IGN);
#endif
tr_evthread_init();
constexpr auto ToggleLooping = [](evutil_socket_t, short, void* vself)
{
auto* const self = static_cast<tr_session_thread_impl*>(vself);
self->is_looping_mutex_.lock();
self->is_looping_ = !self->is_looping_;
self->is_looping_mutex_.unlock();
self->is_looping_cv_.notify_one();
};
event_base_once(evbase, -1, EV_TIMEOUT, ToggleLooping, this, nullptr);
// Start the first event loop. This is the steady-state loop that runs
// continuously until `this` is destroyed. See: ~tr_session_thread_impl()
TR_ASSERT(!is_shutting_down_);
event_base_loop(evbase, EVLOOP_NO_EXIT_ON_EMPTY);
// Start the second event loop. This is the shutdown loop that exits as
// soon as there are no events. It's used to give any remaining events
// a chance to finish up before we exit.
TR_ASSERT(is_shutting_down_);
event_base_loop(evbase, 0);
ToggleLooping({}, {}, this);
}
static void onWorkAvailableStatic(evutil_socket_t /*fd*/, short /*flags*/, void* vself)
{
static_cast<tr_session_thread_impl*>(vself)->onWorkAvailable();
}
void onWorkAvailable()
{
TR_ASSERT(amInSessionThread());
// steal the work queue
auto work_queue_lock = std::unique_lock(work_queue_mutex_);
auto work_queue = work_queue_t{};
std::swap(work_queue, work_queue_);
work_queue_lock.unlock();
// process the work queue
for (auto const& func : work_queue)
{
func();
}
}
std::unique_ptr<event_base, void (*)(event_base*)> const evbase_;
std::unique_ptr<event, void (*)(event*)> const work_queue_event_;
work_queue_t work_queue_;
std::mutex work_queue_mutex_;
std::thread thread_;
std::thread::id thread_id_;
std::mutex is_looping_mutex_;
std::condition_variable is_looping_cv_;
bool is_looping_ = false;
bool is_shutting_down_ = false;
static constexpr std::chrono::seconds Deadline = 5s;
};
std::unique_ptr<tr_session_thread> tr_session_thread::create()
{
return std::make_unique<tr_session_thread_impl>();
}

View File

@ -0,0 +1,42 @@
// This file Copyright © 2007-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#pragma once
#ifndef __TRANSMISSION__
#error only libtransmission should #include this header.
#endif
#include <functional>
#include <memory>
#include <tuple>
#include <utility>
struct event_base;
class tr_session_thread
{
public:
static void tr_evthread_init();
static std::unique_ptr<tr_session_thread> create();
virtual ~tr_session_thread() = default;
[[nodiscard]] virtual struct event_base* eventBase() noexcept = 0;
[[nodiscard]] virtual bool amInSessionThread() const noexcept = 0;
virtual void run(std::function<void(void)>&& func) = 0;
template<typename Func, typename... Args>
void run(Func&& func, Args&&... args)
{
run(std::function<void(void)>{
[func = std::forward<Func&&>(func), args = std::make_tuple(std::forward<Args>(args)...)]()
{
std::apply(std::move(func), std::move(args));
} });
}
};

View File

@ -55,7 +55,6 @@
#include "tr-lpd.h"
#include "tr-strbuf.h"
#include "tr-utp.h"
#include "trevent.h"
#include "utils.h"
#include "variant.h"
#include "verify.h"
@ -227,7 +226,7 @@ void tr_session::WebMediator::notifyBandwidthConsumed(int torrent_id, size_t byt
void tr_session::WebMediator::run(tr_web::FetchDoneFunc&& func, tr_web::FetchResponse&& response) const
{
tr_runInEventThread(session_, std::move(func), std::move(response));
session_->runInSessionThread(std::move(func), std::move(response));
}
void tr_sessionFetch(tr_session* session, tr_web::FetchOptions&& options)
@ -449,27 +448,15 @@ tr_session* tr_sessionInit(char const* config_dir, bool message_queueing_enabled
tr_logSetLevel(static_cast<tr_log_level>(val));
}
// start the libtransmission thread
tr_net_init(); // must go before tr_eventInit
tr_eventInit(session);
TR_ASSERT(session->events != nullptr);
auto data = tr_session::init_data{};
data.config_dir = config_dir;
data.message_queuing_enabled = message_queueing_enabled;
data.client_settings = client_settings;
// run it in the libtransmission thread
if (tr_amInEventThread(session))
{
session->initImpl(data);
}
else
{
auto lock = session->unique_lock();
tr_runInEventThread(session, [&session, &data]() { session->initImpl(data); });
data.done_cv.wait(lock); // wait for the session to be ready
}
// run initImpl() in the libtransmission thread
auto lock = session->unique_lock();
session->runInSessionThread([&session, &data]() { session->initImpl(data); });
data.done_cv.wait(lock); // wait for the session to be ready
return session;
}
@ -515,7 +502,7 @@ void tr_session::onNowTimer()
void tr_session::initImpl(init_data& data)
{
auto lock = unique_lock();
TR_ASSERT(tr_amInEventThread(this));
TR_ASSERT(amInSessionThread());
auto* const client_settings = data.client_settings;
TR_ASSERT(tr_variantIsDict(client_settings));
@ -560,7 +547,7 @@ static void updateBandwidth(tr_session* session, tr_direction dir);
void tr_session::setSettings(tr_variant* settings_dict, bool force)
{
TR_ASSERT(tr_amInEventThread(this));
TR_ASSERT(amInSessionThread());
auto* const settings = settings_dict;
TR_ASSERT(tr_variantIsDict(settings));
@ -655,7 +642,7 @@ void tr_sessionSet(tr_session* session, tr_variant* settings)
{
// run it in the libtransmission thread
if (tr_amInEventThread(session))
if (session->amInSessionThread())
{
session->setSettings(settings, false);
}
@ -664,8 +651,7 @@ void tr_sessionSet(tr_session* session, tr_variant* settings)
auto lock = session->unique_lock();
auto done_cv = std::condition_variable_any{};
tr_runInEventThread(
session,
session->runInSessionThread(
[&session, &settings, &done_cv]()
{
session->setSettings(settings, false);
@ -784,7 +770,7 @@ void tr_session::setPeerPort(tr_port port_in)
}
};
tr_runInEventThread(this, in_session_thread, port_in);
runInSessionThread(in_session_thread, port_in);
}
void tr_sessionSetPeerPort(tr_session* session, uint16_t hport)
@ -947,7 +933,7 @@ void tr_session::AltSpeedMediator::isActiveChanged(bool is_active, tr_session_al
}
};
tr_runInEventThread(&session_, in_session_thread);
session_.runInSessionThread(in_session_thread);
}
/***
@ -1178,27 +1164,26 @@ double tr_sessionGetRawSpeed_KBps(tr_session const* session, tr_direction dir)
return tr_toSpeedKBps(tr_sessionGetRawSpeed_Bps(session, dir));
}
void tr_session::closeImplStart()
void tr_session::closeImplPart1()
{
is_closing_ = true;
// close the low-hanging fruit that can be closed immediately w/o consequences
verifier_.reset();
save_timer_.reset();
now_timer_.reset();
rpc_server_.reset();
lpd_.reset();
port_forwarding_.reset();
closePeerPort();
// tell other items to start shutting down
udp_core_->startShutdown();
announcer_udp_->startShutdown();
save_timer_.reset();
now_timer_.reset();
verifier_.reset();
port_forwarding_.reset();
closePeerPort();
this->rpc_server_.reset();
/* Close the torrents. Get the most active ones first so that
* if we can't get them all closed in a reasonable amount of time,
* at least we get the most important ones first. */
// Close the torrents in order of most active to least active
// so that the most important announce=stopped events are
// fired out first...
auto torrents = getAllTorrents();
std::sort(
std::begin(torrents),
@ -1209,129 +1194,87 @@ void tr_session::closeImplStart()
auto const b_cur = b->downloadedCur + b->uploadedCur;
return a_cur > b_cur; // larger xfers go first
});
for (auto* tor : torrents)
{
tr_torrentFree(tor);
tr_torrentFreeInSessionThread(tor);
}
torrents.clear();
/* Close the announcer *after* closing the torrents
so that all the &event=stopped messages will be
queued to be sent by tr_announcerClose() */
// ...and now that all the torrents have been closed, any
// remaining `event=stopped` announce messages are queued in
// the announcer. The announcer's destructor sends all those
// out via `web_`...
tr_announcerClose(this);
/* and this goes *after* announcer close so that
it won't be idle until the announce events are sent... */
this->web_->closeSoon();
// ...and now that those are queued, tell web_ that we're
// shutting down soon. This leaves the `event=stopped` messages
// in the queue but refuses to take any _new_ tasks
this->web_->startShutdown();
this->cache.reset();
/* saveTimer is not used at this point, reusing for UDP shutdown wait */
// recycle the now-unused save_timer_ here to wait for UDP shutdown
TR_ASSERT(!save_timer_);
save_timer_ = timerMaker().create([this]() { closeImplWaitForIdleUdp(); });
save_timer_->start(1ms);
save_timer_ = timerMaker().create([this]() { closeImplPart2(); });
save_timer_->startRepeating(50ms);
}
void tr_session::closeImplWaitForIdleUdp()
void tr_session::closeImplPart2()
{
/* gotta keep udp running long enough to send out all
the &event=stopped UDP tracker messages */
// try to keep the UDP announcer alive long enough to send out
// all the &event=stopped tracker announces
if (announcer_udp_ && !announcer_udp_->isIdle())
{
announcer_udp_->upkeep();
save_timer_->start(100ms);
return;
}
closeImplFinish();
}
void tr_session::closeImplFinish()
{
save_timer_.reset();
/* we had to wait until UDP trackers were closed before closing these: */
this->announcer_udp_.reset();
this->udp_core_.reset();
stats().saveIfDirty();
peer_mgr_.reset();
tr_utpClose(this);
blocklists_.clear();
openFiles().closeAll();
is_closed_ = true;
}
static bool deadlineReached(time_t const deadline)
{
return time(nullptr) >= deadline;
}
static auto constexpr ShutdownMaxSeconds = time_t{ 20 };
void tr_sessionClose(tr_session* session)
{
TR_ASSERT(session != nullptr);
time_t const deadline = time(nullptr) + ShutdownMaxSeconds;
static auto constexpr DeadlineSecs = 10s;
auto const deadline = std::chrono::steady_clock::now() + DeadlineSecs;
auto const deadline_reached = [deadline]()
{
return std::chrono::steady_clock::now() >= deadline;
};
tr_logAddInfo(fmt::format(_("Transmission version {version} shutting down"), fmt::arg("version", LONG_VERSION_STRING)));
tr_logAddDebug(fmt::format("now is {}, deadline is {}", time(nullptr), deadline));
/* close the session */
tr_runInEventThread(session, [session]() { session->closeImplStart(); });
session->runInSessionThread([session]() { session->closeImplPart1(); });
while (!session->isClosed() && !deadlineReached(deadline))
while (!session->isClosed() && !deadline_reached())
{
tr_logAddTrace("waiting for the libtransmission thread to finish");
tr_wait_msec(10);
}
/* "port_forwarding" and "tracker" have live sockets,
* so we need to keep the transmission thread alive
* for a bit while they tell the router & tracker
* that we're closing now */
while (
(session->port_forwarding_ || !session->web_->isClosed() || session->announcer != nullptr || session->announcer_udp_) &&
!deadlineReached(deadline))
// There's usually a bit of housekeeping to do during shutdown,
// e.g. sending out `event=stopped` announcements to trackers,
// so wait a bit for the session thread to close.
while (!deadline_reached() && (!session->web_->isClosed() || session->announcer != nullptr || session->announcer_udp_))
{
tr_logAddTrace(fmt::format(
"waiting on port unmap ({}) or announcer ({})... now {} deadline {}",
"waiting on port unmap ({}) or announcer ({})... now {}",
fmt::ptr(session->port_forwarding_.get()),
fmt::ptr(session->announcer),
time(nullptr),
deadline));
time(nullptr)));
tr_wait_msec(50);
}
session->web_.reset();
/* close the libtransmission thread */
tr_eventClose(session);
while (session->events != nullptr)
{
static bool forced = false;
tr_logAddTrace(
fmt::format("waiting for libtransmission thread to finish... now {} deadline {}", time(nullptr), deadline));
tr_wait_msec(10);
if (deadlineReached(deadline) && !forced)
{
tr_logAddTrace("calling event_loopbreak()");
forced = true;
event_base_loopbreak(session->eventBase());
}
if (deadlineReached(deadline + 3))
{
tr_logAddTrace("deadline+3 reached... calling break...");
break;
}
}
delete session;
}
@ -1395,7 +1338,7 @@ size_t tr_sessionLoadTorrents(tr_session* session, tr_ctor* ctor)
data.session = session;
data.ctor = ctor;
data.done = false;
tr_runInEventThread(session, sessionLoadTorrents, &data);
session->runInSessionThread(sessionLoadTorrents, &data);
while (!data.done)
{
tr_wait_msec(100);
@ -1451,8 +1394,7 @@ void tr_sessionSetDHTEnabled(tr_session* session, bool enabled)
return;
}
tr_runInEventThread(
session,
session->runInSessionThread(
[session, enabled]()
{
session->udp_core_.reset();
@ -1502,8 +1444,7 @@ void tr_sessionSetLPDEnabled(tr_session* session, bool enabled)
return;
}
tr_runInEventThread(
session,
session->runInSessionThread(
[session, enabled]()
{
session->lpd_.reset();
@ -1598,7 +1539,7 @@ tr_bandwidth& tr_session::getBandwidthGroup(std::string_view name)
void tr_sessionSetPortForwardingEnabled(tr_session* session, bool enabled)
{
tr_runInEventThread(session, [session, enabled]() { session->port_forwarding_->setEnabled(enabled); });
session->runInSessionThread([session, enabled]() { session->port_forwarding_->setEnabled(enabled); });
}
bool tr_sessionIsPortForwardingEnabled(tr_session const* session)
@ -2220,12 +2161,6 @@ auto makeBlocklistDir(std::string_view config_dir)
return dir;
}
auto makeEventBase()
{
tr_evthread_init();
return std::unique_ptr<event_base, void (*)(event_base*)>{ event_base_new(), event_base_free };
}
} // namespace
tr_session::tr_session(std::string_view config_dir, tr_variant* settings_dict)
@ -2233,7 +2168,7 @@ tr_session::tr_session(std::string_view config_dir, tr_variant* settings_dict)
, resume_dir_{ makeResumeDir(config_dir) }
, torrent_dir_{ makeTorrentDir(config_dir) }
, blocklist_dir_{ makeBlocklistDir(config_dir) }
, event_base_{ makeEventBase() }
, session_thread_{ tr_session_thread::create() }
, timer_maker_{ std::make_unique<libtransmission::EvTimerMaker>(eventBase()) }
, settings_{ settings_dict }
, session_id_{ tr_time }

View File

@ -38,6 +38,7 @@
#include "session-alt-speeds.h"
#include "session-id.h"
#include "session-settings.h"
#include "session-thread.h"
#include "stats.h"
#include "torrents.h"
#include "tr-lpd.h"
@ -51,6 +52,7 @@ struct event_base;
class tr_lpd;
class tr_port_forwarding;
class tr_rpc_server;
class tr_session_thread;
class tr_web;
struct struct_utp_context;
struct tr_announcer;
@ -272,16 +274,32 @@ public:
return session_id_.sv();
}
[[nodiscard]] event_base* eventBase() noexcept
{
return event_base_.get();
}
[[nodiscard]] libtransmission::TimerMaker& timerMaker() noexcept
{
return *timer_maker_;
}
[[nodiscard]] auto amInSessionThread() noexcept
{
return session_thread_->amInSessionThread();
}
void runInSessionThread(std::function<void(void)>&& func)
{
session_thread_->run(std::move(func));
}
template<typename Func, typename... Args>
void runInSessionThread(Func&& func, Args&&... args)
{
session_thread_->run(std::move(func), std::move(args)...);
}
[[nodiscard]] auto eventBase() noexcept
{
return session_thread_->eventBase();
}
[[nodiscard]] constexpr auto& torrents()
{
return torrents_;
@ -880,9 +898,8 @@ private:
void initImpl(init_data&);
void setSettings(tr_variant* settings_dict, bool force);
void closeImplStart();
void closeImplWaitForIdleUdp();
void closeImplFinish();
void closeImplPart1();
void closeImplPart2();
void onNowTimer();
@ -976,9 +993,9 @@ private:
std::string const torrent_dir_;
std::string const blocklist_dir_;
std::unique_ptr<event_base, void (*)(event_base*)> const event_base_;
std::unique_ptr<tr_session_thread> const session_thread_;
// depends-on: event_base_
// depends-on: session_thread_
std::unique_ptr<libtransmission::TimerMaker> const timer_maker_;
/// static fields
@ -1031,16 +1048,17 @@ private:
tr_session_id session_id_;
tr_bindinfo bind_ipv4_ = tr_bindinfo{ tr_inaddr_any };
tr_bindinfo bind_ipv6_ = tr_bindinfo{ tr_in6addr_any };
std::vector<libtransmission::Blocklist> blocklists_;
/// other fields
std::vector<libtransmission::Blocklist> blocklists_;
// depends-on: session_thread_
tr_bindinfo bind_ipv4_ = tr_bindinfo{ tr_inaddr_any };
// depends-on: session_thread_
tr_bindinfo bind_ipv6_ = tr_bindinfo{ tr_in6addr_any };
public:
struct tr_event_handle* events = nullptr;
// depends-on: announcer_udp_
// FIXME(ckerr): circular dependency udp_core -> announcer_udp -> announcer_udp_mediator -> udp_core
std::unique_ptr<tr_udp_core> udp_core_;
@ -1056,7 +1074,7 @@ private:
PortForwardingMediator port_forwarding_mediator_{ *this };
std::unique_ptr<tr_port_forwarding> port_forwarding_ = tr_port_forwarding::create(port_forwarding_mediator_);
// depends-on: events, top_bandwidth_
// depends-on: session_thread_, top_bandwidth_
AltSpeedMediator alt_speed_mediator_{ *this };
tr_session_alt_speeds alt_speeds_{ alt_speed_mediator_ };
@ -1065,17 +1083,17 @@ private:
// depends-on: open_files_
tr_torrents torrents_;
// depends-on: timer_maker_, top_bandwidth_, torrents_
std::unique_ptr<struct tr_peerMgr, void (*)(struct tr_peerMgr*)> peer_mgr_;
// depends-on: settings_, session_thread_, torrents_
WebMediator web_mediator_{ this };
std::unique_ptr<tr_web> web_ = tr_web::create(this->web_mediator_);
public:
// depends-on: settings_, open_files_, torrents_
std::unique_ptr<Cache> cache = std::make_unique<Cache>(torrents_, 1024 * 1024 * 2);
private:
// depends-on: settings_, events, torrents_
WebMediator web_mediator_{ this };
std::unique_ptr<tr_web> web_ = tr_web::create(this->web_mediator_);
// depends-on: timer_maker_, top_bandwidth_, torrents_, web_
std::unique_ptr<struct tr_peerMgr, void (*)(struct tr_peerMgr*)> peer_mgr_;
// depends-on: peer_mgr_, torrents_
LpdMediator lpd_mediator_{ *this };
@ -1090,11 +1108,11 @@ public:
// depends-on: announcer_udp_mediator_
std::unique_ptr<tr_announcer_udp> announcer_udp_ = tr_announcer_udp::create(announcer_udp_mediator_);
// depends-on: settings_, torrents_, announcer_udp_
// depends-on: settings_, torrents_, web_, announcer_udp_
struct tr_announcer* announcer = nullptr;
private:
// depends-on: event_base_, timer_maker_, settings_, torrents_
// depends-on: session_thread_, timer_maker_, settings_, torrents_, web_
std::unique_ptr<tr_rpc_server> rpc_server_;
// depends-on: alt_speeds_, udp_core_, torrents_

View File

@ -14,6 +14,23 @@
using namespace std::literals;
namespace
{
struct EventDeleter
{
void operator()(struct event* event)
{
if (event != nullptr)
{
event_del(event);
event_free(event);
}
}
};
} // namespace
namespace libtransmission
{
@ -31,15 +48,11 @@ public:
EvTimer& operator=(EvTimer&&) = delete;
EvTimer& operator=(EvTimer const&) = delete;
~EvTimer() override
{
stop();
event_free(evtimer_);
}
~EvTimer() override = default;
void stop() override
{
evtimer_del(evtimer_);
evtimer_.reset();
}
void start() override
@ -63,8 +76,7 @@ public:
interval_ = interval;
// if evtimer_ is already running, update its interval
if (auto const is_pending = event_pending(evtimer_, EV_TIMEOUT, nullptr); is_pending != 0)
if (evtimer_) // update the timer if it's already running
{
restart();
}
@ -78,28 +90,20 @@ public:
void setRepeating(bool repeating) override
{
is_repeating_ = repeating;
if (evtimer_ != nullptr)
{
event_del(evtimer_);
event_free(evtimer_);
}
evtimer_ = repeating ? event_new(base_, -1, EV_TIMEOUT | EV_PERSIST, onTimer, this) :
event_new(base_, -1, EV_TIMEOUT, onTimer, this);
evtimer_.reset();
}
private:
void restart()
{
stop();
evtimer_.reset(event_new(base_, -1, EV_TIMEOUT | (isRepeating() ? EV_PERSIST : 0), onTimer, this));
using namespace std::chrono;
auto const secs = duration_cast<seconds>(interval_);
auto tv = timeval{};
tv.tv_sec = secs.count();
tv.tv_usec = static_cast<decltype(tv.tv_usec)>(duration_cast<microseconds>(interval_ - secs).count());
evtimer_add(evtimer_, &tv);
evtimer_add(evtimer_.get(), &tv);
}
static void onTimer(evutil_socket_t /*unused*/, short /*unused*/, void* vself)
@ -114,7 +118,7 @@ private:
}
struct event_base* const base_;
struct event* evtimer_ = nullptr;
std::unique_ptr<struct event, EventDeleter> evtimer_;
std::function<void()> callback_;
std::chrono::milliseconds interval_ = 100ms;

View File

@ -48,7 +48,6 @@
#include "torrent-metainfo.h"
#include "torrent.h"
#include "tr-assert.h"
#include "trevent.h" /* tr_runInEventThread() */
#include "utils.h"
#include "version.h"
#include "web-utils.h"
@ -886,7 +885,7 @@ void tr_torrentManualUpdate(tr_torrent* tor)
{
TR_ASSERT(tr_isTorrent(tor));
tr_runInEventThread(tor->session, tr_torrentManualUpdateImpl, tor);
tor->session->runInSessionThread(tr_torrentManualUpdateImpl, tor);
}
bool tr_torrentCanManualUpdate(tr_torrent const* tor)
@ -1384,7 +1383,7 @@ static void torrentStart(tr_torrent* tor, torrent_start_opts opts)
tr_torrentUnsetPeerId(tor);
tor->isRunning = true;
tor->setDirty();
tr_runInEventThread(tor->session, torrentStartImpl, tor);
tor->session->runInSessionThread(torrentStartImpl, tor);
}
void tr_torrentStart(tr_torrent* tor)
@ -1408,7 +1407,7 @@ void tr_torrentStartNow(tr_torrent* tor)
static void onVerifyDoneThreadFunc(tr_torrent* const tor)
{
TR_ASSERT(tr_amInEventThread(tor->session));
TR_ASSERT(tor->session->amInSessionThread());
if (tor->isDeleting)
{
@ -1434,12 +1433,12 @@ void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted)
return;
}
tr_runInEventThread(tor->session, onVerifyDoneThreadFunc, tor);
tor->session->runInSessionThread(onVerifyDoneThreadFunc, tor);
}
static void verifyTorrent(tr_torrent* const tor)
{
TR_ASSERT(tr_amInEventThread(tor->session));
TR_ASSERT(tor->session->amInSessionThread());
auto const lock = tor->unique_lock();
if (tor->isDeleting)
@ -1470,7 +1469,7 @@ static void verifyTorrent(tr_torrent* const tor)
void tr_torrentVerify(tr_torrent* tor)
{
tr_runInEventThread(tor->session, verifyTorrent, tor);
tor->session->runInSessionThread(verifyTorrent, tor);
}
void tr_torrentSave(tr_torrent* tor)
@ -1487,7 +1486,7 @@ void tr_torrentSave(tr_torrent* tor)
static void stopTorrent(tr_torrent* const tor)
{
TR_ASSERT(tr_isTorrent(tor));
TR_ASSERT(tr_amInEventThread(tor->session));
TR_ASSERT(tor->session->amInSessionThread());
auto const lock = tor->unique_lock();
if (!tor->session->isClosing())
@ -1532,13 +1531,14 @@ void tr_torrentStop(tr_torrent* tor)
tor->isRunning = false;
tor->isStopping = false;
tor->setDirty();
tr_runInEventThread(tor->session, stopTorrent, tor);
tor->session->runInSessionThread(stopTorrent, tor);
}
static void closeTorrent(tr_torrent* const tor)
void tr_torrentFreeInSessionThread(tr_torrent* tor)
{
TR_ASSERT(tr_isTorrent(tor));
TR_ASSERT(tr_amInEventThread(tor->session));
TR_ASSERT(tor->session != nullptr);
TR_ASSERT(tor->session->amInSessionThread());
if (!tor->session->isClosing())
{
@ -1559,26 +1559,12 @@ static void closeTorrent(tr_torrent* const tor)
freeTorrent(tor);
}
void tr_torrentFree(tr_torrent* tor)
{
if (tr_isTorrent(tor))
{
tr_session* session = tor->session;
TR_ASSERT(session != nullptr);
auto const lock = tor->unique_lock();
tr_runInEventThread(session, closeTorrent, tor);
}
}
static bool removeTorrentFile(char const* filename, void* /*user_data*/, tr_error** error)
{
return tr_sys_path_remove(filename, error);
}
static void removeTorrentInEventThread(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data)
static void removeTorrentInSessionThread(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data)
{
auto const lock = tor->unique_lock();
@ -1600,7 +1586,7 @@ static void removeTorrentInEventThread(tr_torrent* tor, bool delete_flag, tr_fil
tor->metainfo_.files().remove(tor->currentDir(), tor->name(), delete_func_wrapper);
}
closeTorrent(tor);
tr_torrentFreeInSessionThread(tor);
}
void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func, void* user_data)
@ -1609,7 +1595,7 @@ void tr_torrentRemove(tr_torrent* tor, bool delete_flag, tr_fileFunc delete_func
tor->isDeleting = true;
tr_runInEventThread(tor->session, removeTorrentInEventThread, tor, delete_flag, delete_func, user_data);
tor->session->runInSessionThread(removeTorrentInSessionThread, tor, delete_flag, delete_func, user_data);
}
/**
@ -2059,7 +2045,7 @@ uint64_t tr_torrentGetBytesLeftToAllocate(tr_torrent const* tor)
///
static void setLocationInEventThread(
static void setLocationInSessionThread(
tr_torrent* tor,
std::string const& path,
bool move_from_old_path,
@ -2067,7 +2053,7 @@ static void setLocationInEventThread(
int volatile* setme_state)
{
TR_ASSERT(tr_isTorrent(tor));
TR_ASSERT(tr_amInEventThread(tor->session));
TR_ASSERT(tor->session->amInSessionThread());
auto ok = bool{ true };
if (move_from_old_path)
@ -2125,9 +2111,8 @@ void tr_torrent::setLocation(
*setme_state = TR_LOC_MOVING;
}
tr_runInEventThread(
this->session,
setLocationInEventThread,
this->session->runInSessionThread(
setLocationInSessionThread,
this,
std::string{ location },
move_from_old_path,
@ -2236,7 +2221,7 @@ static void tr_torrentPieceCompleted(tr_torrent* tor, tr_piece_index_t piece_ind
void tr_torrentGotBlock(tr_torrent* tor, tr_block_index_t block)
{
TR_ASSERT(tr_isTorrent(tor));
TR_ASSERT(tr_amInEventThread(tor->session));
TR_ASSERT(tor->session->amInSessionThread());
bool const block_is_new = !tor->hasBlock(block);
@ -2626,8 +2611,7 @@ void tr_torrent::renamePath(
tr_torrent_rename_done_func callback,
void* callback_user_data)
{
tr_runInEventThread(
this->session,
this->session->runInSessionThread(
torrentRenamePath,
this,
std::string{ oldpath },

View File

@ -40,10 +40,10 @@ struct tr_torrent;
struct tr_torrent_announcer;
/**
*** Package-visible ctor API
*** Package-visible
**/
void tr_torrentFree(tr_torrent* tor);
void tr_torrentFreeInSessionThread(tr_torrent* tor);
void tr_ctorInitTorrentPriorities(tr_ctor const* ctor, tr_torrent* tor);

View File

@ -1,275 +0,0 @@
// This file Copyright © 2007-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#include <chrono>
#include <condition_variable>
#include <functional>
#include <list>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <utility>
#include <csignal>
#ifdef _WIN32
#include <winsock2.h>
#endif
#include <event2/event.h>
#include <event2/thread.h>
#include "transmission.h"
#include "log.h"
#include "net.h"
#include "session.h"
#include "tr-assert.h"
#include "trevent.h"
#include "utils.h"
/***
****
***/
namespace
{
namespace tr_evthread_init_helpers
{
void* lock_alloc(unsigned /*locktype*/)
{
return new std::recursive_mutex{};
}
void lock_free(void* vlock, unsigned /*locktype*/)
{
delete static_cast<std::recursive_mutex*>(vlock);
}
int lock_lock(unsigned mode, void* vlock)
{
auto* lock = static_cast<std::recursive_mutex*>(vlock);
if ((mode & EVTHREAD_TRY) != 0U)
{
auto const success = lock->try_lock();
return success ? 0 : -1;
}
lock->lock();
return 0;
}
int lock_unlock(unsigned /*mode*/, void* vlock)
{
static_cast<std::recursive_mutex*>(vlock)->unlock();
return 0;
}
void* cond_alloc(unsigned /*condflags*/)
{
return new std::condition_variable_any();
}
void cond_free(void* vcond)
{
delete static_cast<std::condition_variable_any*>(vcond);
}
int cond_signal(void* vcond, int broadcast)
{
auto* cond = static_cast<std::condition_variable_any*>(vcond);
if (broadcast != 0)
{
cond->notify_all();
}
else
{
cond->notify_one();
}
return 0;
}
int cond_wait(void* vcond, void* vlock, struct timeval const* tv)
{
auto* cond = static_cast<std::condition_variable_any*>(vcond);
auto* lock = static_cast<std::recursive_mutex*>(vlock);
if (tv == nullptr)
{
cond->wait(*lock);
return 0;
}
auto const duration = std::chrono::seconds(tv->tv_sec) + std::chrono::microseconds(tv->tv_usec);
auto const success = cond->wait_for(*lock, duration);
return success == std::cv_status::timeout ? 1 : 0;
}
unsigned long thread_current_id()
{
return std::hash<std::thread::id>()(std::this_thread::get_id());
}
auto evthread_flag = std::once_flag{};
void initEvthreadsOnce()
{
tr_net_init();
evthread_lock_callbacks constexpr LockCbs{
EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, lock_alloc, lock_free, lock_lock, lock_unlock
};
evthread_set_lock_callbacks(&LockCbs);
evthread_condition_callbacks constexpr CondCbs{ EVTHREAD_CONDITION_API_VERSION,
cond_alloc,
cond_free,
cond_signal,
cond_wait };
evthread_set_condition_callbacks(&CondCbs);
evthread_set_id_callback(thread_current_id);
}
} // namespace tr_evthread_init_helpers
} // namespace
void tr_evthread_init()
{
using namespace tr_evthread_init_helpers;
std::call_once(evthread_flag, initEvthreadsOnce);
}
/***
****
***/
struct tr_event_handle
{
using callback = std::function<void(void)>;
using work_queue_t = std::list<callback>;
work_queue_t work_queue;
std::condition_variable work_queue_cv;
std::mutex work_queue_mutex;
event* work_queue_event = nullptr;
tr_session* session = nullptr;
std::thread::id thread_id;
};
static void onWorkAvailable(evutil_socket_t /*fd*/, short /*flags*/, void* vsession)
{
// invariant
auto* const session = static_cast<tr_session*>(vsession);
TR_ASSERT(tr_amInEventThread(session));
// steal the work queue
auto* events = session->events;
auto work_queue_lock = std::unique_lock(events->work_queue_mutex);
auto work_queue = tr_event_handle::work_queue_t{};
std::swap(work_queue, events->work_queue);
work_queue_lock.unlock();
// process the work queue
for (auto const& func : work_queue)
{
func();
}
}
static void libeventThreadFunc(tr_event_handle* events)
{
#ifndef _WIN32
/* Don't exit when writing on a broken socket */
(void)signal(SIGPIPE, SIG_IGN);
#endif
tr_evthread_init();
// create the libevent base
auto* base = events->session->eventBase();
// initialize the session struct's event fields
events->work_queue_event = event_new(base, -1, 0, onWorkAvailable, events->session);
events->session->events = events;
// tell the thread that's waiting in tr_eventInit()
// that this thread is ready for business
events->work_queue_cv.notify_one();
// loop until `tr_eventClose()` kills the loop
event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY);
// shut down the thread
event_free(events->work_queue_event);
events->session->events = nullptr;
delete events;
tr_logAddTrace("Closing libevent thread");
}
void tr_eventInit(tr_session* session)
{
session->events = nullptr;
auto* const events = new tr_event_handle();
events->session = session;
auto lock = std::unique_lock(events->work_queue_mutex);
auto thread = std::thread(libeventThreadFunc, events);
events->thread_id = thread.get_id();
thread.detach();
// wait until the libevent thread is running
events->work_queue_cv.wait(lock, [session] { return session->events != nullptr; });
}
void tr_eventClose(tr_session* session)
{
TR_ASSERT(session != nullptr);
auto* events = session->events;
if (events == nullptr)
{
return;
}
event_base_loopexit(session->eventBase(), nullptr);
tr_logAddTrace("closing trevent pipe");
}
/**
***
**/
bool tr_amInEventThread(tr_session const* session)
{
TR_ASSERT(session != nullptr);
TR_ASSERT(session->events != nullptr);
return std::this_thread::get_id() == session->events->thread_id;
}
/**
***
**/
void tr_runInEventThread(tr_session* session, std::function<void(void)>&& func)
{
TR_ASSERT(session != nullptr);
auto* events = session->events;
TR_ASSERT(events != nullptr);
if (tr_amInEventThread(session))
{
func();
}
else
{
auto lock = std::unique_lock(events->work_queue_mutex);
events->work_queue.emplace_back(std::move(func));
lock.unlock();
event_active(events->work_queue_event, 0, {});
}
}

View File

@ -1,39 +0,0 @@
// This file Copyright © 2007-2022 Mnemosyne LLC.
// It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only),
// or any future license endorsed by Mnemosyne LLC.
// License text can be found in the licenses/ folder.
#pragma once
#ifndef __TRANSMISSION__
#error only libtransmission should #include this header.
#endif
#include <functional>
#include <tuple>
#include <utility>
#include "tr-macros.h"
struct tr_session;
void tr_evthread_init();
void tr_eventInit(tr_session* session);
void tr_eventClose(tr_session* session);
bool tr_amInEventThread(tr_session const* session);
void tr_runInEventThread(tr_session* session, std::function<void(void)>&& func);
template<typename Func, typename... Args>
void tr_runInEventThread(tr_session* session, Func&& func, Args&&... args)
{
tr_runInEventThread(
session,
std::function<void(void)>{ [func = std::forward<Func&&>(func), args = std::make_tuple(std::forward<Args>(args)...)]()
{
std::apply(std::move(func), std::move(args));
} });
}

View File

@ -154,7 +154,7 @@ public:
curl_thread->join();
}
void closeSoon()
void startShutdown()
{
run_mode = RunMode::CloseSoon;
queued_tasks_cv.notify_one();
@ -635,7 +635,7 @@ bool tr_web::isClosed() const noexcept
return impl_->isClosed();
}
void tr_web::closeSoon()
void tr_web::startShutdown()
{
impl_->closeSoon();
impl_->startShutdown();
}

View File

@ -91,7 +91,7 @@ public:
// Notify tr_web that it's going to be destroyed soon.
// New fetch() tasks will be rejected, but already-running tasks
// are left alone so that they can finish.
void closeSoon();
void startShutdown();
// True when tr_web is ready to be destroyed.
// Will never be true until after closeSoon() is called.

View File

@ -25,7 +25,6 @@
#include "peer-mgr.h"
#include "timer.h"
#include "torrent.h"
#include "trevent.h" // tr_runInEventThread()
#include "utils.h"
#include "web-utils.h"
#include "web.h"
@ -396,7 +395,7 @@ void useFetchedBlocks(tr_webseed_task* task)
block_buf->resize(block_size);
evbuffer_remove(task->content(), std::data(*block_buf), std::size(*block_buf));
auto* const data = new write_block_data{ session, tor->id(), task->loc.block, block_buf, webseed };
tr_runInEventThread(session, &write_block_data::write_block_func, data);
session->runInSessionThread(&write_block_data::write_block_func, data);
}
task->loc = tor->byteLoc(task->loc.byte + block_size);

View File

@ -105,7 +105,7 @@ TEST_P(IncompleteDirTest, incompleteDir)
data.buf = std::make_unique<std::vector<uint8_t>>(tr_block_info::BlockSize, '\0');
data.block = block_index;
data.done = false;
tr_runInEventThread(session_, test_incomplete_dir_threadfunc, &data);
session_->runInSessionThread(test_incomplete_dir_threadfunc, &data);
auto const test = [&data]()
{

View File

@ -23,7 +23,6 @@
#include "platform.h" // TR_PATH_DELIMITER
#include "quark.h"
#include "torrent.h"
#include "trevent.h" // tr_amInEventThread()
#include "utils.h"
#include "variant.h"
@ -490,7 +489,7 @@ protected:
void blockingTorrentVerify(tr_torrent* tor) const
{
EXPECT_NE(nullptr, tor->session);
EXPECT_FALSE(tr_amInEventThread(tor->session));
EXPECT_FALSE(tor->session->amInSessionThread());
auto const n_previously_verified = std::size(verified_);
tr_torrentVerify(tor);
waitFor(