1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2025-02-23 14:40:43 +00:00

refactor: move the verify worker thread into a wrapper class ()

This commit is contained in:
Charles Kerr 2022-09-05 23:43:59 -05:00 committed by GitHub
parent 4adcb73bb0
commit 82212ff1d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 191 additions and 134 deletions

View file

@ -1817,7 +1817,7 @@ void tr_session::closeImplStart()
save_timer_.reset();
now_timer_.reset();
tr_verifyClose(this);
verifier_.reset();
tr_sharedClose(*this);
close_incoming_peer_port(this);
@ -2950,4 +2950,6 @@ tr_session::tr_session(std::string_view config_dir)
stats().saveIfDirty();
});
save_timer_->startRepeating(SaveIntervalSecs);
verifier_->addCallback(tr_torrentOnVerifyDone);
}

View file

@ -39,6 +39,7 @@
#include "torrents.h"
#include "tr-lpd.h"
#include "web.h"
#include "verify.h"
enum tr_auto_switch_state_t
{
@ -60,6 +61,13 @@ struct struct_utp_context;
struct tr_announcer;
struct tr_announcer_udp;
namespace libtransmission::test
{
class SessionTest;
} // namespace libtransmission::test
struct tr_bindinfo
{
explicit tr_bindinfo(tr_address addr)
@ -698,6 +706,22 @@ public:
return peer_id_ttl_hours_;
}
void verifyRemove(tr_torrent* tor)
{
if (verifier_)
{
verifier_->remove(tor);
}
}
void verifyAdd(tr_torrent* tor)
{
if (verifier_)
{
verifier_->add(tor);
}
}
private:
[[nodiscard]] tr_port randomPort() const;
@ -710,6 +734,7 @@ private:
void closeImplWaitForIdleUdp();
void closeImplFinish();
friend class libtransmission::test::SessionTest;
friend bool tr_blocklistExists(tr_session const* session);
friend bool tr_sessionGetAntiBruteForceEnabled(tr_session const* session);
friend bool tr_sessionIsRPCEnabled(tr_session const* session);
@ -888,6 +913,8 @@ private:
tr_torrents torrents_;
std::unique_ptr<tr_verify_worker> verifier_ = std::make_unique<tr_verify_worker>();
std::array<std::string, TR_SCRIPT_N_TYPES> scripts_;
std::string const config_dir_;

View file

@ -50,7 +50,6 @@
#include "tr-assert.h"
#include "trevent.h" /* tr_runInEventThread() */
#include "utils.h"
#include "verify.h"
#include "version.h"
#include "web-utils.h"
@ -1480,7 +1479,7 @@ static void onVerifyDoneThreadFunc(tr_torrent* const tor)
}
}
static void onVerifyDone(tr_torrent* tor, bool aborted, void* /*unused*/)
void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted)
{
if (aborted || tor->isDeleting)
{
@ -1501,7 +1500,7 @@ static void verifyTorrent(tr_torrent* const tor)
}
/* if the torrent's already being verified, stop it */
tr_verifyRemove(tor);
tor->session->verifyRemove(tor);
bool const startAfter = (tor->isRunning || tor->startAfterVerify) && !tor->isStopping;
@ -1517,7 +1516,7 @@ static void verifyTorrent(tr_torrent* const tor)
else
{
tor->startAfterVerify = startAfter;
tr_verifyAdd(tor, onVerifyDone, nullptr);
tor->session->verifyAdd(tor);
}
}
@ -1548,7 +1547,8 @@ static void stopTorrent(tr_torrent* const tor)
tr_logAddInfoTor(tor, _("Pausing torrent"));
}
tr_verifyRemove(tor);
tor->session->verifyRemove(tor);
tr_peerMgrStopTorrent(tor);
tr_announcerTorrentStopped(tor);
@ -1633,7 +1633,7 @@ static void removeTorrentInEventThread(tr_torrent* tor, bool delete_flag, tr_fil
{
// ensure the files are all closed and idle before moving
tor->session->closeTorrentFiles(tor);
tr_verifyRemove(tor);
tor->session->verifyRemove(tor);
if (delete_func == nullptr)
{
@ -2126,7 +2126,7 @@ static void setLocationInEventThread(
// ensure the files are all closed and idle before moving
tor->session->closeTorrentFiles(tor);
tr_verifyRemove(tor);
tor->session->verifyRemove(tor);
tr_error* error = nullptr;
ok = tor->metainfo_.files().move(tor->currentDir(), path, setme_progress, tor->name(), &error);

View file

@ -781,6 +781,8 @@ void tr_ctorSetBandwidthPriority(tr_ctor* ctor, tr_priority_t priority);
tr_priority_t tr_ctorGetBandwidthPriority(tr_ctor const* ctor);
tr_torrent::labels_t const& tr_ctorGetLabels(tr_ctor const* ctor);
void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted);
#define tr_logAddCriticalTor(tor, msg) tr_logAddCritical(msg, (tor)->name())
#define tr_logAddErrorTor(tor, msg) tr_logAddError(msg, (tor)->name())
#define tr_logAddWarnTor(tor, msg) tr_logAddWarn(msg, (tor)->name())

View file

@ -24,13 +24,39 @@
#include "utils.h" // tr_time(), tr_wait_msec()
#include "verify.h"
/***
****
***/
namespace
{
static auto constexpr MsecToSleepPerSecondDuringVerify = int{ 100 };
auto constexpr MsecToSleepPerSecondDuringVerify = int{ 100 };
static bool verifyTorrent(tr_torrent* tor, bool const* stopFlag)
}
int tr_verify_worker::Node::compare(tr_verify_worker::Node const& that) const
{
// higher priority comes before lower priority
auto const pa = tr_torrentGetPriority(torrent);
auto const pb = tr_torrentGetPriority(that.torrent);
if (pa != pb)
{
return pa > pb ? -1 : 1;
}
// smaller torrents come before larger ones because they verify faster
if (current_size != that.current_size)
{
return current_size < that.current_size ? -1 : 1;
}
// tertiary compare just to ensure they don't compare equal
if (torrent->id() != that.torrent->id())
{
return torrent->id() < that.torrent->id() ? -1 : 1;
}
return 0;
}
bool tr_verify_worker::verifyTorrent(tr_torrent* tor, bool const* stop_flag)
{
auto const begin = tr_time();
@ -48,7 +74,7 @@ static bool verifyTorrent(tr_torrent* tor, bool const* stopFlag)
tr_logAddDebugTor(tor, "verifying torrent...");
while (!*stopFlag && piece < tor->pieceCount())
while (!*stop_flag && piece < tor->pieceCount())
{
auto const file_length = tor->fileSize(file_index);
@ -151,129 +177,74 @@ static bool verifyTorrent(tr_torrent* tor, bool const* stopFlag)
return changed;
}
/***
****
***/
struct verify_node
{
tr_torrent* torrent;
tr_verify_done_func callback_func;
void* callback_data;
uint64_t current_size;
[[nodiscard]] int compare(verify_node const& that) const
{
// higher priority comes before lower priority
auto const pa = tr_torrentGetPriority(torrent);
auto const pb = tr_torrentGetPriority(that.torrent);
if (pa != pb)
{
return pa > pb ? -1 : 1;
}
// smaller torrents come before larger ones because they verify faster
if (current_size != that.current_size)
{
return current_size < that.current_size ? -1 : 1;
}
// tertiary compare just to ensure they don't compare equal
if (torrent->infoHash() != that.torrent->infoHash())
{
return torrent->infoHash() < that.torrent->infoHash() ? -1 : 1;
}
return 0;
}
bool operator<(verify_node const& that) const
{
return compare(that) < 0;
}
};
static struct verify_node currentNode;
// TODO: refactor s.t. this doesn't leak
static auto& verify_list{ *new std::set<verify_node>{} };
static std::optional<std::thread::id> verify_thread_id;
static bool stopCurrent = false;
static std::mutex verify_mutex_;
static void verifyThreadFunc()
void tr_verify_worker::verifyThreadFunc()
{
for (;;)
{
{
auto const lock = std::lock_guard(verify_mutex_);
stopCurrent = false;
if (std::empty(verify_list))
stop_current_ = false;
if (std::empty(todo_))
{
currentNode.torrent = nullptr;
verify_thread_id.reset();
current_node_.reset();
verify_thread_id_.reset();
return;
}
auto const it = std::begin(verify_list);
currentNode = *it;
verify_list.erase(it);
auto const it = std::begin(todo_);
current_node_ = *it;
todo_.erase(it);
}
tr_torrent* tor = currentNode.torrent;
auto* const tor = current_node_->torrent;
tr_logAddTraceTor(tor, "Verifying torrent");
tor->setVerifyState(TR_VERIFY_NOW);
auto const changed = verifyTorrent(tor, &stopCurrent);
auto const changed = verifyTorrent(tor, &stop_current_);
tor->setVerifyState(TR_VERIFY_NONE);
TR_ASSERT(tr_isTorrent(tor));
if (!stopCurrent && changed)
if (!stop_current_ && changed)
{
tor->setDirty();
}
if (currentNode.callback_func != nullptr)
{
(*currentNode.callback_func)(tor, stopCurrent, currentNode.callback_data);
}
callCallback(tor, stop_current_);
}
}
void tr_verifyAdd(tr_torrent* tor, tr_verify_done_func callback_func, void* callback_data)
void tr_verify_worker::add(tr_torrent* tor)
{
TR_ASSERT(tr_isTorrent(tor));
tr_logAddTraceTor(tor, "Queued for verification");
auto node = verify_node{};
auto node = Node{};
node.torrent = tor;
node.callback_func = callback_func;
node.callback_data = callback_data;
node.current_size = tor->hasTotal();
auto const lock = std::lock_guard(verify_mutex_);
tor->setVerifyState(TR_VERIFY_WAIT);
verify_list.insert(node);
todo_.insert(node);
if (!verify_thread_id)
if (!verify_thread_id_)
{
auto thread = std::thread(verifyThreadFunc);
verify_thread_id = thread.get_id();
auto thread = std::thread(&tr_verify_worker::verifyThreadFunc, this);
verify_thread_id_ = thread.get_id();
thread.detach();
}
}
void tr_verifyRemove(tr_torrent* tor)
void tr_verify_worker::remove(tr_torrent* tor)
{
TR_ASSERT(tr_isTorrent(tor));
verify_mutex_.lock();
if (tor == currentNode.torrent)
if (current_node_ && current_node_->torrent == tor)
{
stopCurrent = true;
stop_current_ = true;
while (stopCurrent)
while (stop_current_)
{
verify_mutex_.unlock();
tr_wait_msec(100);
@ -282,31 +253,32 @@ void tr_verifyRemove(tr_torrent* tor)
}
else
{
auto const it = std::find_if(
std::begin(verify_list),
std::end(verify_list),
auto const iter = std::find_if(
std::begin(todo_),
std::end(todo_),
[tor](auto const& task) { return tor == task.torrent; });
tor->setVerifyState(TR_VERIFY_NONE);
if (it != std::end(verify_list))
if (iter != std::end(todo_))
{
if (it->callback_func != nullptr)
{
(*it->callback_func)(tor, true, it->callback_data);
}
verify_list.erase(it);
callCallback(tor, true);
todo_.erase(iter);
}
}
verify_mutex_.unlock();
}
void tr_verifyClose(tr_session* /*session*/)
tr_verify_worker::~tr_verify_worker()
{
auto const lock = std::lock_guard(verify_mutex_);
stopCurrent = true;
verify_list.clear();
stop_current_ = true;
todo_.clear();
while (verify_thread_id_.has_value())
{
tr_wait_msec(20);
}
}

View file

@ -9,20 +9,64 @@
#error only libtransmission should #include this header.
#endif
#include <cstdint>
#include <functional>
#include <list>
#include <mutex>
#include <optional>
#include <set>
#include <thread>
struct tr_session;
struct tr_torrent;
/**
* @addtogroup file_io File IO
* @{
*/
class tr_verify_worker
{
public:
using callback_func = std::function<void(tr_torrent*, bool aborted)>;
using tr_verify_done_func = void (*)(tr_torrent*, bool aborted, void* user_data);
~tr_verify_worker();
void tr_verifyAdd(tr_torrent* tor, tr_verify_done_func callback_func, void* callback_data);
void addCallback(callback_func callback)
{
callbacks_.emplace_back(std::move(callback));
}
void tr_verifyRemove(tr_torrent* tor);
void add(tr_torrent* tor);
void tr_verifyClose(tr_session*);
void remove(tr_torrent* tor);
/* @} */
private:
struct Node
{
tr_torrent* torrent = nullptr;
uint64_t current_size = 0;
[[nodiscard]] int compare(Node const& that) const;
[[nodiscard]] bool operator<(Node const& that) const
{
return compare(that) < 0;
}
};
void callCallback(tr_torrent* tor, bool aborted)
{
for (auto& callback : callbacks_)
{
callback(tor, aborted);
}
}
void verifyThreadFunc();
[[nodiscard]] static bool verifyTorrent(tr_torrent* tor, bool const* stop_flag);
std::list<callback_func> callbacks_;
std::mutex verify_mutex_;
std::set<Node> todo_;
std::optional<Node> current_node_;
std::optional<std::thread::id> verify_thread_id_;
bool stop_current_ = false;
};

View file

@ -1388,9 +1388,13 @@ TEST_F(FileTest, dirOpen)
odir = tr_sys_dir_open(test_dir);
EXPECT_NE(TR_BAD_SYS_DIR, odir);
auto files = std::set<std::string>{};
char const* filename = nullptr;
while ((filename = tr_sys_dir_read_name(odir, &err)))
for (;;)
{
char const* const filename = tr_sys_dir_read_name(odir, &err);
if (filename == nullptr)
{
break;
}
files.insert(filename);
}
EXPECT_EQ(3U, files.size());

View file

@ -66,7 +66,7 @@ protected:
sync();
}
static tr_torrent* createTorrentFromBase64Metainfo(tr_ctor* ctor, char const* benc_base64)
tr_torrent* createTorrentFromBase64Metainfo(tr_ctor* ctor, char const* benc_base64)
{
// create the torrent ctor
auto const benc = tr_base64_decode(benc_base64);
@ -77,10 +77,8 @@ protected:
tr_ctorSetPaused(ctor, TR_FORCE, true);
// create the torrent
auto* const tor = tr_torrentNew(ctor, nullptr);
auto* const tor = createTorrentAndWaitForVerifyDone(ctor);
EXPECT_NE(nullptr, tor);
// cleanup
return tor;
}

View file

@ -380,7 +380,19 @@ protected:
Complete
};
tr_torrent* zeroTorrentInit(ZeroTorrentState state) const
[[nodiscard]] tr_torrent* createTorrentAndWaitForVerifyDone(tr_ctor* ctor) const
{
auto const n_previously_verified = std::size(verified_);
auto* const tor = tr_torrentNew(ctor, nullptr);
EXPECT_NE(nullptr, tor);
waitFor(
[this, tor, n_previously_verified]()
{ return std::size(verified_) > n_previously_verified && verified_.back() == tor; },
20s);
return tor;
}
[[nodiscard]] tr_torrent* zeroTorrentInit(ZeroTorrentState state) const
{
// 1048576 files-filled-with-zeroes/1048576
// 4096 files-filled-with-zeroes/4096
@ -445,29 +457,21 @@ protected:
}
}
// create the torrent
auto* const tor = tr_torrentNew(ctor, nullptr);
EXPECT_NE(nullptr, tor);
waitForVerify(tor);
// cleanup
auto* const tor = createTorrentAndWaitForVerifyDone(ctor);
tr_ctorFree(ctor);
return tor;
}
void waitForVerify(tr_torrent* tor) const
{
EXPECT_NE(nullptr, tor->session);
tr_wait_msec(100);
EXPECT_TRUE(waitFor([tor]() { return tor->verifyState() == TR_VERIFY_NONE && tor->checked_pieces_.hasAll(); }, 4000));
}
void blockingTorrentVerify(tr_torrent* tor) const
{
EXPECT_NE(nullptr, tor->session);
EXPECT_FALSE(tr_amInEventThread(tor->session));
auto const n_previously_verified = std::size(verified_);
tr_torrentVerify(tor);
waitForVerify(tor);
waitFor(
[this, tor, n_previously_verified]()
{ return std::size(verified_) > n_previously_verified && verified_.back() == tor; },
20s);
}
tr_session* session_ = nullptr;
@ -494,6 +498,7 @@ protected:
SandboxedTest::SetUp();
session_ = sessionInit(settings());
session_->verifier_->addCallback([this](tr_torrent* tor, bool /*aborted*/) { verified_.emplace_back(tor); });
}
virtual void TearDown() override
@ -504,6 +509,9 @@ protected:
SandboxedTest::TearDown();
}
private:
std::vector<tr_torrent*> verified_;
};
} // namespace test