From 82212ff1d894ea228a542cda5f6c8aa33f4b5dd5 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Mon, 5 Sep 2022 23:43:59 -0500 Subject: [PATCH] refactor: move the verify worker thread into a wrapper class (#3775) --- libtransmission/session.cc | 4 +- libtransmission/session.h | 27 +++++ libtransmission/torrent.cc | 14 +-- libtransmission/torrent.h | 2 + libtransmission/verify.cc | 164 +++++++++++--------------- libtransmission/verify.h | 62 ++++++++-- tests/libtransmission/file-test.cc | 8 +- tests/libtransmission/rename-test.cc | 6 +- tests/libtransmission/test-fixtures.h | 38 +++--- 9 files changed, 191 insertions(+), 134 deletions(-) diff --git a/libtransmission/session.cc b/libtransmission/session.cc index 4bb17b29e..f90439aa2 100644 --- a/libtransmission/session.cc +++ b/libtransmission/session.cc @@ -1817,7 +1817,7 @@ void tr_session::closeImplStart() save_timer_.reset(); now_timer_.reset(); - tr_verifyClose(this); + verifier_.reset(); tr_sharedClose(*this); close_incoming_peer_port(this); @@ -2950,4 +2950,6 @@ tr_session::tr_session(std::string_view config_dir) stats().saveIfDirty(); }); save_timer_->startRepeating(SaveIntervalSecs); + + verifier_->addCallback(tr_torrentOnVerifyDone); } diff --git a/libtransmission/session.h b/libtransmission/session.h index ec3746f3e..c105894c9 100644 --- a/libtransmission/session.h +++ b/libtransmission/session.h @@ -39,6 +39,7 @@ #include "torrents.h" #include "tr-lpd.h" #include "web.h" +#include "verify.h" enum tr_auto_switch_state_t { @@ -60,6 +61,13 @@ struct struct_utp_context; struct tr_announcer; struct tr_announcer_udp; +namespace libtransmission::test +{ + +class SessionTest; + +} // namespace libtransmission::test + struct tr_bindinfo { explicit tr_bindinfo(tr_address addr) @@ -698,6 +706,22 @@ public: return peer_id_ttl_hours_; } + void verifyRemove(tr_torrent* tor) + { + if (verifier_) + { + verifier_->remove(tor); + } + } + + void verifyAdd(tr_torrent* tor) + { + if (verifier_) + { + verifier_->add(tor); + } + } + private: [[nodiscard]] tr_port randomPort() const; @@ -710,6 +734,7 @@ private: void closeImplWaitForIdleUdp(); void closeImplFinish(); + friend class libtransmission::test::SessionTest; friend bool tr_blocklistExists(tr_session const* session); friend bool tr_sessionGetAntiBruteForceEnabled(tr_session const* session); friend bool tr_sessionIsRPCEnabled(tr_session const* session); @@ -888,6 +913,8 @@ private: tr_torrents torrents_; + std::unique_ptr verifier_ = std::make_unique(); + std::array scripts_; std::string const config_dir_; diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc index fec4b9e81..ed4f37460 100644 --- a/libtransmission/torrent.cc +++ b/libtransmission/torrent.cc @@ -50,7 +50,6 @@ #include "tr-assert.h" #include "trevent.h" /* tr_runInEventThread() */ #include "utils.h" -#include "verify.h" #include "version.h" #include "web-utils.h" @@ -1480,7 +1479,7 @@ static void onVerifyDoneThreadFunc(tr_torrent* const tor) } } -static void onVerifyDone(tr_torrent* tor, bool aborted, void* /*unused*/) +void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted) { if (aborted || tor->isDeleting) { @@ -1501,7 +1500,7 @@ static void verifyTorrent(tr_torrent* const tor) } /* if the torrent's already being verified, stop it */ - tr_verifyRemove(tor); + tor->session->verifyRemove(tor); bool const startAfter = (tor->isRunning || tor->startAfterVerify) && !tor->isStopping; @@ -1517,7 +1516,7 @@ static void verifyTorrent(tr_torrent* const tor) else { tor->startAfterVerify = startAfter; - tr_verifyAdd(tor, onVerifyDone, nullptr); + tor->session->verifyAdd(tor); } } @@ -1548,7 +1547,8 @@ static void stopTorrent(tr_torrent* const tor) tr_logAddInfoTor(tor, _("Pausing torrent")); } - tr_verifyRemove(tor); + tor->session->verifyRemove(tor); + tr_peerMgrStopTorrent(tor); tr_announcerTorrentStopped(tor); @@ -1633,7 +1633,7 @@ static void removeTorrentInEventThread(tr_torrent* tor, bool delete_flag, tr_fil { // ensure the files are all closed and idle before moving tor->session->closeTorrentFiles(tor); - tr_verifyRemove(tor); + tor->session->verifyRemove(tor); if (delete_func == nullptr) { @@ -2126,7 +2126,7 @@ static void setLocationInEventThread( // ensure the files are all closed and idle before moving tor->session->closeTorrentFiles(tor); - tr_verifyRemove(tor); + tor->session->verifyRemove(tor); tr_error* error = nullptr; ok = tor->metainfo_.files().move(tor->currentDir(), path, setme_progress, tor->name(), &error); diff --git a/libtransmission/torrent.h b/libtransmission/torrent.h index 95f3ea38f..0f4f1c4ab 100644 --- a/libtransmission/torrent.h +++ b/libtransmission/torrent.h @@ -781,6 +781,8 @@ void tr_ctorSetBandwidthPriority(tr_ctor* ctor, tr_priority_t priority); tr_priority_t tr_ctorGetBandwidthPriority(tr_ctor const* ctor); tr_torrent::labels_t const& tr_ctorGetLabels(tr_ctor const* ctor); +void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted); + #define tr_logAddCriticalTor(tor, msg) tr_logAddCritical(msg, (tor)->name()) #define tr_logAddErrorTor(tor, msg) tr_logAddError(msg, (tor)->name()) #define tr_logAddWarnTor(tor, msg) tr_logAddWarn(msg, (tor)->name()) diff --git a/libtransmission/verify.cc b/libtransmission/verify.cc index afec7e9e6..f3fa530be 100644 --- a/libtransmission/verify.cc +++ b/libtransmission/verify.cc @@ -24,13 +24,39 @@ #include "utils.h" // tr_time(), tr_wait_msec() #include "verify.h" -/*** -**** -***/ +namespace +{ -static auto constexpr MsecToSleepPerSecondDuringVerify = int{ 100 }; +auto constexpr MsecToSleepPerSecondDuringVerify = int{ 100 }; -static bool verifyTorrent(tr_torrent* tor, bool const* stopFlag) +} + +int tr_verify_worker::Node::compare(tr_verify_worker::Node const& that) const +{ + // higher priority comes before lower priority + auto const pa = tr_torrentGetPriority(torrent); + auto const pb = tr_torrentGetPriority(that.torrent); + if (pa != pb) + { + return pa > pb ? -1 : 1; + } + + // smaller torrents come before larger ones because they verify faster + if (current_size != that.current_size) + { + return current_size < that.current_size ? -1 : 1; + } + + // tertiary compare just to ensure they don't compare equal + if (torrent->id() != that.torrent->id()) + { + return torrent->id() < that.torrent->id() ? -1 : 1; + } + + return 0; +} + +bool tr_verify_worker::verifyTorrent(tr_torrent* tor, bool const* stop_flag) { auto const begin = tr_time(); @@ -48,7 +74,7 @@ static bool verifyTorrent(tr_torrent* tor, bool const* stopFlag) tr_logAddDebugTor(tor, "verifying torrent..."); - while (!*stopFlag && piece < tor->pieceCount()) + while (!*stop_flag && piece < tor->pieceCount()) { auto const file_length = tor->fileSize(file_index); @@ -151,129 +177,74 @@ static bool verifyTorrent(tr_torrent* tor, bool const* stopFlag) return changed; } -/*** -**** -***/ - -struct verify_node -{ - tr_torrent* torrent; - tr_verify_done_func callback_func; - void* callback_data; - uint64_t current_size; - - [[nodiscard]] int compare(verify_node const& that) const - { - // higher priority comes before lower priority - auto const pa = tr_torrentGetPriority(torrent); - auto const pb = tr_torrentGetPriority(that.torrent); - if (pa != pb) - { - return pa > pb ? -1 : 1; - } - - // smaller torrents come before larger ones because they verify faster - if (current_size != that.current_size) - { - return current_size < that.current_size ? -1 : 1; - } - - // tertiary compare just to ensure they don't compare equal - if (torrent->infoHash() != that.torrent->infoHash()) - { - return torrent->infoHash() < that.torrent->infoHash() ? -1 : 1; - } - - return 0; - } - - bool operator<(verify_node const& that) const - { - return compare(that) < 0; - } -}; - -static struct verify_node currentNode; -// TODO: refactor s.t. this doesn't leak -static auto& verify_list{ *new std::set{} }; -static std::optional verify_thread_id; -static bool stopCurrent = false; - -static std::mutex verify_mutex_; - -static void verifyThreadFunc() +void tr_verify_worker::verifyThreadFunc() { for (;;) { { auto const lock = std::lock_guard(verify_mutex_); - stopCurrent = false; - if (std::empty(verify_list)) + stop_current_ = false; + if (std::empty(todo_)) { - currentNode.torrent = nullptr; - verify_thread_id.reset(); + current_node_.reset(); + verify_thread_id_.reset(); return; } - auto const it = std::begin(verify_list); - currentNode = *it; - verify_list.erase(it); + auto const it = std::begin(todo_); + current_node_ = *it; + todo_.erase(it); } - tr_torrent* tor = currentNode.torrent; + auto* const tor = current_node_->torrent; tr_logAddTraceTor(tor, "Verifying torrent"); tor->setVerifyState(TR_VERIFY_NOW); - auto const changed = verifyTorrent(tor, &stopCurrent); + auto const changed = verifyTorrent(tor, &stop_current_); tor->setVerifyState(TR_VERIFY_NONE); TR_ASSERT(tr_isTorrent(tor)); - if (!stopCurrent && changed) + if (!stop_current_ && changed) { tor->setDirty(); } - if (currentNode.callback_func != nullptr) - { - (*currentNode.callback_func)(tor, stopCurrent, currentNode.callback_data); - } + callCallback(tor, stop_current_); } } -void tr_verifyAdd(tr_torrent* tor, tr_verify_done_func callback_func, void* callback_data) +void tr_verify_worker::add(tr_torrent* tor) { TR_ASSERT(tr_isTorrent(tor)); tr_logAddTraceTor(tor, "Queued for verification"); - auto node = verify_node{}; + auto node = Node{}; node.torrent = tor; - node.callback_func = callback_func; - node.callback_data = callback_data; node.current_size = tor->hasTotal(); auto const lock = std::lock_guard(verify_mutex_); tor->setVerifyState(TR_VERIFY_WAIT); - verify_list.insert(node); + todo_.insert(node); - if (!verify_thread_id) + if (!verify_thread_id_) { - auto thread = std::thread(verifyThreadFunc); - verify_thread_id = thread.get_id(); + auto thread = std::thread(&tr_verify_worker::verifyThreadFunc, this); + verify_thread_id_ = thread.get_id(); thread.detach(); } } -void tr_verifyRemove(tr_torrent* tor) +void tr_verify_worker::remove(tr_torrent* tor) { TR_ASSERT(tr_isTorrent(tor)); verify_mutex_.lock(); - if (tor == currentNode.torrent) + if (current_node_ && current_node_->torrent == tor) { - stopCurrent = true; + stop_current_ = true; - while (stopCurrent) + while (stop_current_) { verify_mutex_.unlock(); tr_wait_msec(100); @@ -282,31 +253,32 @@ void tr_verifyRemove(tr_torrent* tor) } else { - auto const it = std::find_if( - std::begin(verify_list), - std::end(verify_list), + auto const iter = std::find_if( + std::begin(todo_), + std::end(todo_), [tor](auto const& task) { return tor == task.torrent; }); tor->setVerifyState(TR_VERIFY_NONE); - if (it != std::end(verify_list)) + if (iter != std::end(todo_)) { - if (it->callback_func != nullptr) - { - (*it->callback_func)(tor, true, it->callback_data); - } - - verify_list.erase(it); + callCallback(tor, true); + todo_.erase(iter); } } verify_mutex_.unlock(); } -void tr_verifyClose(tr_session* /*session*/) +tr_verify_worker::~tr_verify_worker() { auto const lock = std::lock_guard(verify_mutex_); - stopCurrent = true; - verify_list.clear(); + stop_current_ = true; + todo_.clear(); + + while (verify_thread_id_.has_value()) + { + tr_wait_msec(20); + } } diff --git a/libtransmission/verify.h b/libtransmission/verify.h index c2eef1e5e..2726865dc 100644 --- a/libtransmission/verify.h +++ b/libtransmission/verify.h @@ -9,20 +9,64 @@ #error only libtransmission should #include this header. #endif +#include +#include +#include +#include +#include +#include +#include + struct tr_session; struct tr_torrent; -/** - * @addtogroup file_io File IO - * @{ - */ +class tr_verify_worker +{ +public: + using callback_func = std::function; -using tr_verify_done_func = void (*)(tr_torrent*, bool aborted, void* user_data); + ~tr_verify_worker(); -void tr_verifyAdd(tr_torrent* tor, tr_verify_done_func callback_func, void* callback_data); + void addCallback(callback_func callback) + { + callbacks_.emplace_back(std::move(callback)); + } -void tr_verifyRemove(tr_torrent* tor); + void add(tr_torrent* tor); -void tr_verifyClose(tr_session*); + void remove(tr_torrent* tor); -/* @} */ +private: + struct Node + { + tr_torrent* torrent = nullptr; + uint64_t current_size = 0; + + [[nodiscard]] int compare(Node const& that) const; + + [[nodiscard]] bool operator<(Node const& that) const + { + return compare(that) < 0; + } + }; + + void callCallback(tr_torrent* tor, bool aborted) + { + for (auto& callback : callbacks_) + { + callback(tor, aborted); + } + } + + void verifyThreadFunc(); + [[nodiscard]] static bool verifyTorrent(tr_torrent* tor, bool const* stop_flag); + + std::list callbacks_; + std::mutex verify_mutex_; + + std::set todo_; + std::optional current_node_; + + std::optional verify_thread_id_; + bool stop_current_ = false; +}; diff --git a/tests/libtransmission/file-test.cc b/tests/libtransmission/file-test.cc index 736b67be6..1086ca55d 100644 --- a/tests/libtransmission/file-test.cc +++ b/tests/libtransmission/file-test.cc @@ -1388,9 +1388,13 @@ TEST_F(FileTest, dirOpen) odir = tr_sys_dir_open(test_dir); EXPECT_NE(TR_BAD_SYS_DIR, odir); auto files = std::set{}; - char const* filename = nullptr; - while ((filename = tr_sys_dir_read_name(odir, &err))) + for (;;) { + char const* const filename = tr_sys_dir_read_name(odir, &err); + if (filename == nullptr) + { + break; + } files.insert(filename); } EXPECT_EQ(3U, files.size()); diff --git a/tests/libtransmission/rename-test.cc b/tests/libtransmission/rename-test.cc index 27472f765..f8948539c 100644 --- a/tests/libtransmission/rename-test.cc +++ b/tests/libtransmission/rename-test.cc @@ -66,7 +66,7 @@ protected: sync(); } - static tr_torrent* createTorrentFromBase64Metainfo(tr_ctor* ctor, char const* benc_base64) + tr_torrent* createTorrentFromBase64Metainfo(tr_ctor* ctor, char const* benc_base64) { // create the torrent ctor auto const benc = tr_base64_decode(benc_base64); @@ -77,10 +77,8 @@ protected: tr_ctorSetPaused(ctor, TR_FORCE, true); // create the torrent - auto* const tor = tr_torrentNew(ctor, nullptr); + auto* const tor = createTorrentAndWaitForVerifyDone(ctor); EXPECT_NE(nullptr, tor); - - // cleanup return tor; } diff --git a/tests/libtransmission/test-fixtures.h b/tests/libtransmission/test-fixtures.h index b4c1ffb48..cc5da7de3 100644 --- a/tests/libtransmission/test-fixtures.h +++ b/tests/libtransmission/test-fixtures.h @@ -380,7 +380,19 @@ protected: Complete }; - tr_torrent* zeroTorrentInit(ZeroTorrentState state) const + [[nodiscard]] tr_torrent* createTorrentAndWaitForVerifyDone(tr_ctor* ctor) const + { + auto const n_previously_verified = std::size(verified_); + auto* const tor = tr_torrentNew(ctor, nullptr); + EXPECT_NE(nullptr, tor); + waitFor( + [this, tor, n_previously_verified]() + { return std::size(verified_) > n_previously_verified && verified_.back() == tor; }, + 20s); + return tor; + } + + [[nodiscard]] tr_torrent* zeroTorrentInit(ZeroTorrentState state) const { // 1048576 files-filled-with-zeroes/1048576 // 4096 files-filled-with-zeroes/4096 @@ -445,29 +457,21 @@ protected: } } - // create the torrent - auto* const tor = tr_torrentNew(ctor, nullptr); - EXPECT_NE(nullptr, tor); - waitForVerify(tor); - - // cleanup + auto* const tor = createTorrentAndWaitForVerifyDone(ctor); tr_ctorFree(ctor); return tor; } - void waitForVerify(tr_torrent* tor) const - { - EXPECT_NE(nullptr, tor->session); - tr_wait_msec(100); - EXPECT_TRUE(waitFor([tor]() { return tor->verifyState() == TR_VERIFY_NONE && tor->checked_pieces_.hasAll(); }, 4000)); - } - void blockingTorrentVerify(tr_torrent* tor) const { EXPECT_NE(nullptr, tor->session); EXPECT_FALSE(tr_amInEventThread(tor->session)); + auto const n_previously_verified = std::size(verified_); tr_torrentVerify(tor); - waitForVerify(tor); + waitFor( + [this, tor, n_previously_verified]() + { return std::size(verified_) > n_previously_verified && verified_.back() == tor; }, + 20s); } tr_session* session_ = nullptr; @@ -494,6 +498,7 @@ protected: SandboxedTest::SetUp(); session_ = sessionInit(settings()); + session_->verifier_->addCallback([this](tr_torrent* tor, bool /*aborted*/) { verified_.emplace_back(tor); }); } virtual void TearDown() override @@ -504,6 +509,9 @@ protected: SandboxedTest::TearDown(); } + +private: + std::vector verified_; }; } // namespace test