fix: more thread safety updates (#4136)

* fix: thread safety in tr_sessionLoadTorrents()

* refactor: use std::future in tr_sessionSet()

* refactor: use std::future in tr_sessionClose()
This commit is contained in:
Charles Kerr 2022-11-10 12:04:49 -06:00 committed by GitHub
parent 6f43883d8b
commit d61e31c419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 85 deletions

View File

@ -1594,8 +1594,8 @@ void tr_announcer::upkeep()
{
auto const lock = session->unique_lock();
bool const is_closing = session->isClosed();
time_t const now = tr_time();
auto const is_closing = session->isClosing();
auto const now = tr_time();
/* maybe send out some "stopped" messages for closed torrents */
flushCloseMessages(this);

View File

@ -10,6 +10,7 @@
#include <cstdint>
#include <cstdlib> // atoi()
#include <ctime>
#include <future>
#include <iterator> // for std::back_inserter
#include <list>
#include <memory>
@ -660,25 +661,16 @@ void tr_session::setSettings(tr_session_settings settings_in, bool force)
void tr_sessionSet(tr_session* session, tr_variant* settings)
{
// run it in the libtransmission thread
if (session->amInSessionThread())
{
session->setSettings(settings, false);
}
else
{
auto lock = session->unique_lock();
auto done_cv = std::condition_variable_any{};
session->runInSessionThread(
[&session, &settings, &done_cv]()
{
session->setSettings(settings, false);
done_cv.notify_one();
});
done_cv.wait(lock);
}
// do the work in the session thread
auto done_promise = std::promise<void>{};
auto done_future = done_promise.get_future();
session->runInSessionThread(
[&session, &settings, &done_promise]()
{
session->setSettings(settings, false);
done_promise.set_value();
});
done_future.wait();
}
/***
@ -1160,13 +1152,7 @@ double tr_sessionGetRawSpeed_KBps(tr_session const* session, tr_direction dir)
return tr_toSpeedKBps(tr_sessionGetRawSpeed_Bps(session, dir));
}
struct tr_session::is_closed_data
{
std::mutex is_closed_mutex;
std::condition_variable is_closed_cv;
};
void tr_session::closeImplPart1(is_closed_data* closed_data)
void tr_session::closeImplPart1(std::promise<void>* closed_promise)
{
is_closing_ = true;
@ -1215,11 +1201,11 @@ void tr_session::closeImplPart1(is_closed_data* closed_data)
// recycle the now-unused save_timer_ here to wait for UDP shutdown
TR_ASSERT(!save_timer_);
save_timer_ = timerMaker().create([this, closed_data]() { closeImplPart2(closed_data); });
save_timer_ = timerMaker().create([this, closed_promise]() { closeImplPart2(closed_promise); });
save_timer_->startRepeating(50ms);
}
void tr_session::closeImplPart2(is_closed_data* closed_data)
void tr_session::closeImplPart2(std::promise<void>* closed_promise)
{
// try to keep the UDP announcer alive long enough to send out
// all the &event=stopped tracker announces
@ -1240,10 +1226,7 @@ void tr_session::closeImplPart2(is_closed_data* closed_data)
openFiles().closeAll();
// tada we are done!
closed_data->is_closed_mutex.lock();
is_closed_ = true;
closed_data->is_closed_mutex.unlock();
closed_data->is_closed_cv.notify_one();
closed_promise->set_value();
}
void tr_sessionClose(tr_session* session)
@ -1253,30 +1236,21 @@ void tr_sessionClose(tr_session* session)
tr_logAddInfo(fmt::format(_("Transmission version {version} shutting down"), fmt::arg("version", LONG_VERSION_STRING)));
auto closed_data = tr_session::is_closed_data{};
auto lock = std::unique_lock{ closed_data.is_closed_mutex };
session->runInSessionThread([session, &closed_data]() { session->closeImplPart1(&closed_data); });
closed_data.is_closed_cv.wait_for(lock, 12s, [session]() { return session->is_closed_.load(); });
auto closed_promise = std::promise<void>{};
auto closed_future = closed_promise.get_future();
session->runInSessionThread([session, &closed_promise]() { session->closeImplPart1(&closed_promise); });
closed_future.wait_for(12s);
delete session;
}
struct sessionLoadTorrentsData
static void sessionLoadTorrents(tr_session* session, tr_ctor* ctor, std::promise<size_t>* loaded_promise)
{
tr_session* session;
tr_ctor* ctor;
bool done;
};
static void sessionLoadTorrents(struct sessionLoadTorrentsData* const data)
{
TR_ASSERT(data->session != nullptr);
auto const& dirname = data->session->torrentDir();
auto const& dirname = session->torrentDir();
auto const info = tr_sys_path_get_info(dirname);
auto const odir = info && info->isFolder() ? tr_sys_dir_open(dirname.c_str()) : TR_BAD_SYS_DIR;
auto torrents = std::list<tr_torrent*>{};
auto n_torrents = size_t{};
if (odir != TR_BAD_SYS_DIR)
{
char const* name = nullptr;
@ -1290,44 +1264,43 @@ static void sessionLoadTorrents(struct sessionLoadTorrentsData* const data)
auto const path = tr_pathbuf{ dirname, '/', name };
// is a magnet link?
if (!tr_ctorSetMetainfoFromFile(data->ctor, path.sv(), nullptr))
if (!tr_ctorSetMetainfoFromFile(ctor, path.sv(), nullptr))
{
if (auto buf = std::vector<char>{}; tr_loadFile(path, buf))
{
tr_ctorSetMetainfoFromMagnetLink(data->ctor, std::string_view{ std::data(buf), std::size(buf) }, nullptr);
tr_ctorSetMetainfoFromMagnetLink(ctor, std::string_view{ std::data(buf), std::size(buf) }, nullptr);
}
}
if (tr_torrent* const tor = tr_torrentNew(data->ctor, nullptr); tor != nullptr)
if (tr_torrent* const tor = tr_torrentNew(ctor, nullptr); tor != nullptr)
{
torrents.push_back(tor);
++n_torrents;
}
}
tr_sys_dir_close(odir);
}
if (auto const n = std::size(torrents); n != 0U)
if (n_torrents != 0U)
{
tr_logAddInfo(fmt::format(ngettext("Loaded {count} torrent", "Loaded {count} torrents", n), fmt::arg("count", n)));
tr_logAddInfo(fmt::format(
ngettext("Loaded {count} torrent", "Loaded {count} torrents", n_torrents),
fmt::arg("count", n_torrents)));
}
data->done = true;
loaded_promise->set_value(n_torrents);
}
size_t tr_sessionLoadTorrents(tr_session* session, tr_ctor* ctor)
{
auto data = sessionLoadTorrentsData{};
data.session = session;
data.ctor = ctor;
data.done = false;
session->runInSessionThread(sessionLoadTorrents, &data);
while (!data.done)
{
tr_wait_msec(100);
}
auto loaded_promise = std::promise<size_t>{};
auto loaded_future = loaded_promise.get_future();
return std::size(session->torrents());
session->runInSessionThread(sessionLoadTorrents, session, ctor, &loaded_promise);
loaded_future.wait();
auto const n_torrents = loaded_future.get();
return n_torrents;
}
size_t tr_sessionGetAllTorrents(tr_session* session, tr_torrent** buf, size_t buflen)

View File

@ -12,9 +12,9 @@
#define TR_NAME "Transmission"
#include <array>
#include <atomic>
#include <cstddef> // size_t
#include <cstdint> // uintX_t
#include <future>
#include <memory>
#include <mutex>
#include <optional>
@ -317,7 +317,7 @@ public:
[[nodiscard]] auto unique_lock() const
{
return std::unique_lock(session_mutex);
return std::unique_lock(session_mutex_);
}
// paths
@ -680,11 +680,6 @@ public:
return is_closing_;
}
[[nodiscard]] bool isClosed() const noexcept
{
return is_closed_;
}
[[nodiscard]] constexpr auto encryptionMode() const noexcept
{
return settings_.encryption_mode;
@ -898,9 +893,8 @@ private:
void setSettings(tr_variant* settings_dict, bool force);
void setSettings(tr_session_settings settings, bool force);
struct is_closed_data;
void closeImplPart1(is_closed_data*);
void closeImplPart2(is_closed_data*);
void closeImplPart1(std::promise<void>* closed_promise);
void closeImplPart2(std::promise<void>* closed_promise);
void onNowTimer();
@ -1001,10 +995,6 @@ private:
// depends-on: session_thread_
std::unique_ptr<libtransmission::TimerMaker> const timer_maker_;
/// static fields
static inline std::recursive_mutex session_mutex;
/// trivial type fields
tr_session_settings settings_;
@ -1046,10 +1036,11 @@ private:
uint16_t peer_count_ = 0;
bool is_closing_ = false;
std::atomic<bool> is_closed_ = false;
/// fields that aren't trivial,
/// but are self-contained / have no interdependencies
/// but are self-contained / don't hold references to others
mutable std::recursive_mutex session_mutex_;
tr_stats session_stats_{ config_dir_, time(nullptr) };

View File

@ -102,7 +102,7 @@ public:
}
private:
[[nodiscard]] constexpr short events(bool is_repeating) noexcept
[[nodiscard]] static constexpr short events(bool is_repeating) noexcept
{
return static_cast<short>(EV_TIMEOUT | (is_repeating ? EV_PERSIST : 0));
}

View File

@ -225,7 +225,7 @@ void tr_utpInit(tr_session* session)
bool tr_utpPacket(unsigned char const* buf, size_t buflen, struct sockaddr const* from, socklen_t fromlen, tr_session* ss)
{
if (!ss->isClosed() && !ss->utp_timer)
if (!ss->isClosing() && !ss->utp_timer)
{
ss->utp_timer = ss->timerMaker().create(timer_callback, ss);
reset_timer(ss);