refactor: decouple tr_verify_worker from tr_torrent (#6123)
This commit is contained in:
parent
bb125b5e1e
commit
4bdb3066d8
|
@ -239,7 +239,7 @@ char const* torrentStop(tr_session* session, tr_variant* args_in, tr_variant* /*
|
|||
{
|
||||
for (auto* tor : getTorrents(session, args_in))
|
||||
{
|
||||
if (tor->is_running() || tor->is_queued() || tor->verify_state() != TR_VERIFY_NONE)
|
||||
if (tor->activity() != TR_STATUS_STOPPED)
|
||||
{
|
||||
tor->is_stopping_ = true;
|
||||
session->rpcNotify(TR_RPC_TORRENT_STOPPED, tor);
|
||||
|
|
|
@ -2004,6 +2004,24 @@ size_t tr_session::countQueueFreeSlots(tr_direction dir) const noexcept
|
|||
|
||||
// ---
|
||||
|
||||
void tr_session::verify_remove(tr_torrent const* const tor)
|
||||
{
|
||||
if (verifier_)
|
||||
{
|
||||
verifier_->remove(tor->info_hash());
|
||||
}
|
||||
}
|
||||
|
||||
void tr_session::verify_add(tr_torrent* const tor)
|
||||
{
|
||||
if (verifier_)
|
||||
{
|
||||
verifier_->add(std::make_unique<tr_torrent::VerifyMediator>(tor), tor->get_priority());
|
||||
}
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
void tr_session::closeTorrentFiles(tr_torrent* tor) noexcept
|
||||
{
|
||||
this->cache->flush_torrent(tor);
|
||||
|
@ -2122,8 +2140,6 @@ tr_session::tr_session(std::string_view config_dir, tr_variant const& settings_d
|
|||
stats().save();
|
||||
});
|
||||
save_timer_->start_repeating(SaveIntervalSecs);
|
||||
|
||||
verifier_->add_callback(tr_torrentOnVerifyDone);
|
||||
}
|
||||
|
||||
void tr_session::addIncoming(tr_peer_socket&& socket)
|
||||
|
|
|
@ -888,21 +888,8 @@ public:
|
|||
return settings_.ratio_limit;
|
||||
}
|
||||
|
||||
void verifyRemove(tr_torrent* tor)
|
||||
{
|
||||
if (verifier_)
|
||||
{
|
||||
verifier_->remove(tor);
|
||||
}
|
||||
}
|
||||
|
||||
void verifyAdd(tr_torrent* tor)
|
||||
{
|
||||
if (verifier_)
|
||||
{
|
||||
verifier_->add(tor);
|
||||
}
|
||||
}
|
||||
void verify_add(tr_torrent* tor);
|
||||
void verify_remove(tr_torrent const* tor);
|
||||
|
||||
void fetch(tr_web::FetchOptions&& options) const
|
||||
{
|
||||
|
|
|
@ -60,6 +60,8 @@ struct tr_ctor
|
|||
|
||||
std::vector<char> contents;
|
||||
|
||||
tr_torrent::VerifyDoneCallback verify_done_callback_;
|
||||
|
||||
explicit tr_ctor(tr_session const* session_in)
|
||||
: session{ session_in }
|
||||
{
|
||||
|
@ -68,6 +70,20 @@ struct tr_ctor
|
|||
|
||||
// ---
|
||||
|
||||
void tr_ctorSetVerifyDoneCallback(tr_ctor* ctor, tr_torrent::VerifyDoneCallback&& callback)
|
||||
{
|
||||
ctor->verify_done_callback_ = std::move(callback);
|
||||
}
|
||||
|
||||
tr_torrent::VerifyDoneCallback tr_ctorStealVerifyDoneCallback(tr_ctor* ctor)
|
||||
{
|
||||
auto tmp = tr_torrent::VerifyDoneCallback{};
|
||||
std::swap(ctor->verify_done_callback_, tmp);
|
||||
return tmp;
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
bool tr_ctorSetMetainfoFromFile(tr_ctor* ctor, std::string_view filename, tr_error** error)
|
||||
{
|
||||
if (std::empty(filename))
|
||||
|
|
|
@ -698,7 +698,7 @@ void removeTorrentInSessionThread(tr_torrent* tor, bool delete_flag, tr_fileFunc
|
|||
{
|
||||
// ensure the files are all closed and idle before moving
|
||||
tor->session->closeTorrentFiles(tor);
|
||||
tor->session->verifyRemove(tor);
|
||||
tor->session->verify_remove(tor);
|
||||
|
||||
if (delete_func == nullptr)
|
||||
{
|
||||
|
@ -829,7 +829,7 @@ void torrentStop(tr_torrent* const tor)
|
|||
tr_logAddInfoTor(tor, _("Pausing torrent"));
|
||||
}
|
||||
|
||||
tor->session->verifyRemove(tor);
|
||||
tor->session->verify_remove(tor);
|
||||
|
||||
tor->stopped_.emit(tor);
|
||||
tor->session->announcer_->stopTorrent(tor);
|
||||
|
@ -1172,6 +1172,7 @@ tr_torrent* tr_torrentNew(tr_ctor* ctor, tr_torrent** setme_duplicate_of)
|
|||
}
|
||||
|
||||
auto* const tor = new tr_torrent{ std::move(metainfo) };
|
||||
tor->verify_done_callback_ = tr_ctorStealVerifyDoneCallback(ctor);
|
||||
torrentInit(tor, ctor);
|
||||
return tor;
|
||||
}
|
||||
|
@ -1202,7 +1203,7 @@ void setLocationInSessionThread(
|
|||
|
||||
// ensure the files are all closed and idle before moving
|
||||
tor->session->closeTorrentFiles(tor);
|
||||
tor->session->verifyRemove(tor);
|
||||
tor->session->verify_remove(tor);
|
||||
|
||||
tr_error* error = nullptr;
|
||||
ok = tor->metainfo_.files().move(tor->current_dir(), path, setme_progress, tor->name(), &error);
|
||||
|
@ -1689,7 +1690,7 @@ void verifyTorrent(tr_torrent* const tor, bool force)
|
|||
}
|
||||
|
||||
/* if the torrent's already being verified, stop it */
|
||||
tor->session->verifyRemove(tor);
|
||||
tor->session->verify_remove(tor);
|
||||
|
||||
if (!tor->has_metainfo())
|
||||
{
|
||||
|
@ -1703,24 +1704,12 @@ void verifyTorrent(tr_torrent* const tor, bool force)
|
|||
|
||||
if (force || !setLocalErrorIfFilesDisappeared(tor))
|
||||
{
|
||||
tor->session->verifyAdd(tor);
|
||||
tor->session->verify_add(tor);
|
||||
}
|
||||
}
|
||||
} // namespace verify_helpers
|
||||
} // namespace
|
||||
|
||||
void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted)
|
||||
{
|
||||
using namespace verify_helpers;
|
||||
|
||||
if (aborted || tor->is_deleting_)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
tor->session->runInSessionThread(onVerifyDoneThreadFunc, tor);
|
||||
}
|
||||
|
||||
void tr_torrentVerify(tr_torrent* tor, bool force)
|
||||
{
|
||||
using namespace verify_helpers;
|
||||
|
@ -1728,13 +1717,86 @@ void tr_torrentVerify(tr_torrent* tor, bool force)
|
|||
tor->session->runInSessionThread(verifyTorrent, tor, force);
|
||||
}
|
||||
|
||||
void tr_torrent::set_verify_state(tr_verify_state state)
|
||||
void tr_torrent::set_verify_state(VerifyState const state)
|
||||
{
|
||||
TR_ASSERT(state == TR_VERIFY_NONE || state == TR_VERIFY_WAIT || state == TR_VERIFY_NOW);
|
||||
TR_ASSERT(state == VerifyState::None || state == VerifyState::Queued || state == VerifyState::Active);
|
||||
|
||||
this->verify_state_ = state;
|
||||
this->verify_progress_ = {};
|
||||
this->mark_changed();
|
||||
verify_state_ = state;
|
||||
verify_progress_ = {};
|
||||
mark_changed();
|
||||
}
|
||||
|
||||
tr_torrent_metainfo const& tr_torrent::VerifyMediator::metainfo() const
|
||||
{
|
||||
return tor_->metainfo_;
|
||||
}
|
||||
|
||||
std::optional<std::string> tr_torrent::VerifyMediator::find_file(tr_file_index_t const file_index) const
|
||||
{
|
||||
if (auto const found = tor_->find_file(file_index); found)
|
||||
{
|
||||
return std::string{ found->filename().sv() };
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
void tr_torrent::VerifyMediator::on_verify_queued()
|
||||
{
|
||||
tr_logAddTraceTor(tor_, "Queued for verification");
|
||||
tor_->set_verify_state(VerifyState::Queued);
|
||||
}
|
||||
|
||||
void tr_torrent::VerifyMediator::on_verify_started()
|
||||
{
|
||||
tr_logAddDebugTor(tor_, "Verifying torrent");
|
||||
time_started_ = tr_time();
|
||||
tor_->set_verify_state(VerifyState::Active);
|
||||
}
|
||||
|
||||
void tr_torrent::VerifyMediator::on_piece_checked(tr_piece_index_t const piece, bool const has_piece)
|
||||
{
|
||||
auto const had_piece = tor_->has_piece(piece);
|
||||
|
||||
if (has_piece || had_piece)
|
||||
{
|
||||
tor_->set_has_piece(piece, has_piece);
|
||||
tor_->set_dirty();
|
||||
}
|
||||
|
||||
tor_->checked_pieces_.set(piece, true);
|
||||
tor_->mark_changed();
|
||||
tor_->verify_progress_ = std::clamp(static_cast<float>(piece + 1U) / tor_->metainfo_.piece_count(), 0.0F, 1.0F);
|
||||
}
|
||||
|
||||
void tr_torrent::VerifyMediator::on_verify_done(bool const aborted)
|
||||
{
|
||||
using namespace verify_helpers;
|
||||
|
||||
if (time_started_.has_value())
|
||||
{
|
||||
auto const total_size = tor_->total_size();
|
||||
auto const duration_secs = tr_time() - *time_started_;
|
||||
tr_logAddDebugTor(
|
||||
tor_,
|
||||
fmt::format(
|
||||
"Verification is done. It took {} seconds to verify {} bytes ({} bytes per second)",
|
||||
duration_secs,
|
||||
total_size,
|
||||
total_size / (1 + duration_secs)));
|
||||
}
|
||||
|
||||
tor_->set_verify_state(VerifyState::None);
|
||||
|
||||
if (!aborted && !tor_->is_deleting_)
|
||||
{
|
||||
tor_->session->runInSessionThread(onVerifyDoneThreadFunc, tor_);
|
||||
}
|
||||
|
||||
if (tor_->verify_done_callback_)
|
||||
{
|
||||
tor_->verify_done_callback_(tor_);
|
||||
}
|
||||
}
|
||||
|
||||
// ---
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
#include <cstddef> // size_t
|
||||
#include <ctime>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
@ -28,13 +29,14 @@
|
|||
#include "libtransmission/crypto-utils.h"
|
||||
#include "libtransmission/file-piece-map.h"
|
||||
#include "libtransmission/interned-string.h"
|
||||
#include "libtransmission/observable.h"
|
||||
#include "libtransmission/log.h"
|
||||
#include "libtransmission/observable.h"
|
||||
#include "libtransmission/session.h"
|
||||
#include "libtransmission/torrent-magnet.h"
|
||||
#include "libtransmission/torrent-metainfo.h"
|
||||
#include "libtransmission/tr-assert.h"
|
||||
#include "libtransmission/tr-macros.h"
|
||||
#include "libtransmission/verify.h"
|
||||
|
||||
class tr_swarm;
|
||||
struct tr_error;
|
||||
|
@ -73,17 +75,35 @@ void tr_torrentCheckSeedLimit(tr_torrent* tor);
|
|||
/** save a torrent's .resume file if it's changed since the last time it was saved */
|
||||
void tr_torrentSave(tr_torrent* tor);
|
||||
|
||||
enum tr_verify_state : uint8_t
|
||||
{
|
||||
TR_VERIFY_NONE,
|
||||
TR_VERIFY_WAIT,
|
||||
TR_VERIFY_NOW
|
||||
};
|
||||
|
||||
/** @brief Torrent object */
|
||||
struct tr_torrent final : public tr_completion::torrent_view
|
||||
{
|
||||
public:
|
||||
using VerifyDoneCallback = std::function<void(tr_torrent*)>;
|
||||
|
||||
class VerifyMediator : public tr_verify_worker::Mediator
|
||||
{
|
||||
public:
|
||||
explicit VerifyMediator(tr_torrent* const tor)
|
||||
: tor_{ tor }
|
||||
{
|
||||
}
|
||||
|
||||
~VerifyMediator() override = default;
|
||||
|
||||
[[nodiscard]] tr_torrent_metainfo const& metainfo() const override;
|
||||
[[nodiscard]] std::optional<std::string> find_file(tr_file_index_t file_index) const override;
|
||||
|
||||
void on_verify_queued() override;
|
||||
void on_verify_started() override;
|
||||
void on_piece_checked(tr_piece_index_t piece, bool has_piece) override;
|
||||
void on_verify_done(bool aborted) override;
|
||||
|
||||
private:
|
||||
tr_torrent* const tor_;
|
||||
std::optional<time_t> time_started_;
|
||||
};
|
||||
|
||||
explicit tr_torrent(tr_torrent_metainfo&& tm)
|
||||
: metainfo_{ std::move(tm) }
|
||||
, completion{ this, &this->metainfo_.block_info() }
|
||||
|
@ -581,21 +601,9 @@ public:
|
|||
|
||||
void refresh_current_dir();
|
||||
|
||||
void set_verify_state(tr_verify_state state);
|
||||
|
||||
[[nodiscard]] constexpr auto verify_state() const noexcept
|
||||
{
|
||||
return verify_state_;
|
||||
}
|
||||
|
||||
constexpr void set_verify_progress(float f) noexcept
|
||||
{
|
||||
verify_progress_ = f;
|
||||
}
|
||||
|
||||
[[nodiscard]] constexpr std::optional<float> verify_progress() const noexcept
|
||||
{
|
||||
if (verify_state_ == TR_VERIFY_NOW)
|
||||
if (verify_state_ == VerifyState::Active)
|
||||
{
|
||||
return verify_progress_;
|
||||
}
|
||||
|
@ -622,12 +630,12 @@ public:
|
|||
{
|
||||
bool const is_seed = this->is_done();
|
||||
|
||||
if (this->verify_state() == TR_VERIFY_NOW)
|
||||
if (verify_state_ == VerifyState::Active)
|
||||
{
|
||||
return TR_STATUS_CHECK;
|
||||
}
|
||||
|
||||
if (this->verify_state() == TR_VERIFY_WAIT)
|
||||
if (verify_state_ == VerifyState::Queued)
|
||||
{
|
||||
return TR_STATUS_CHECK_WAIT;
|
||||
}
|
||||
|
@ -992,6 +1000,14 @@ public:
|
|||
|
||||
private:
|
||||
friend tr_stat const* tr_torrentStat(tr_torrent* tor);
|
||||
friend tr_torrent* tr_torrentNew(tr_ctor* ctor, tr_torrent** setme_duplicate_of);
|
||||
|
||||
enum class VerifyState : uint8_t
|
||||
{
|
||||
None,
|
||||
Queued,
|
||||
Active
|
||||
};
|
||||
|
||||
// Tracks a torrent's error state, either local (e.g. file IO errors)
|
||||
// or tracker errors (e.g. warnings returned by a tracker).
|
||||
|
@ -1129,8 +1145,12 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
void set_verify_state(VerifyState state);
|
||||
|
||||
Error error_;
|
||||
|
||||
VerifyDoneCallback verify_done_callback_;
|
||||
|
||||
tr_interned_string bandwidth_group_;
|
||||
|
||||
SimpleSmoothedSpeed eta_speed_;
|
||||
|
@ -1153,7 +1173,7 @@ private:
|
|||
|
||||
tr_idlelimit idle_limit_mode_ = TR_IDLELIMIT_GLOBAL;
|
||||
|
||||
tr_verify_state verify_state_ = TR_VERIFY_NONE;
|
||||
VerifyState verify_state_ = VerifyState::None;
|
||||
|
||||
uint16_t idle_limit_minutes_ = 0;
|
||||
|
||||
|
@ -1183,7 +1203,8 @@ void tr_ctorSetBandwidthPriority(tr_ctor* ctor, tr_priority_t priority);
|
|||
tr_priority_t tr_ctorGetBandwidthPriority(tr_ctor const* ctor);
|
||||
tr_torrent::labels_t const& tr_ctorGetLabels(tr_ctor const* ctor);
|
||||
|
||||
void tr_torrentOnVerifyDone(tr_torrent* tor, bool aborted);
|
||||
void tr_ctorSetVerifyDoneCallback(tr_ctor* ctor, tr_torrent::VerifyDoneCallback&& callback);
|
||||
tr_torrent::VerifyDoneCallback tr_ctorStealVerifyDoneCallback(tr_ctor* ctor);
|
||||
|
||||
#define tr_logAddCriticalTor(tor, msg) tr_logAddCritical(msg, (tor)->name())
|
||||
#define tr_logAddErrorTor(tor, msg) tr_logAddError(msg, (tor)->name())
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
// License text can be found in the licenses/ folder.
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <ctime>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
|
@ -17,79 +18,49 @@
|
|||
#include "libtransmission/completion.h"
|
||||
#include "libtransmission/crypto-utils.h"
|
||||
#include "libtransmission/file.h"
|
||||
#include "libtransmission/log.h"
|
||||
#include "libtransmission/torrent.h"
|
||||
#include "libtransmission/tr-assert.h"
|
||||
#include "libtransmission/utils.h" // tr_time()
|
||||
#include "libtransmission/verify.h"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
auto constexpr SleepPerSecondDuringVerify = 100ms;
|
||||
|
||||
}
|
||||
|
||||
int tr_verify_worker::Node::compare(tr_verify_worker::Node const& that) const
|
||||
[[nodiscard]] auto current_time_secs()
|
||||
{
|
||||
// higher priority comes before lower priority
|
||||
auto const pa = tr_torrentGetPriority(torrent);
|
||||
auto const pb = tr_torrentGetPriority(that.torrent);
|
||||
if (auto const val = tr_compare_3way(pa, pb); val != 0)
|
||||
{
|
||||
return -val;
|
||||
}
|
||||
|
||||
// smaller torrents come before larger ones because they verify faster
|
||||
if (auto const val = tr_compare_3way(current_size, that.current_size); val != 0)
|
||||
{
|
||||
return val;
|
||||
}
|
||||
|
||||
// tertiary compare just to ensure they don't compare equal
|
||||
return tr_compare_3way(torrent->id(), that.torrent->id());
|
||||
return std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::steady_clock::now());
|
||||
}
|
||||
} // namespace
|
||||
|
||||
bool tr_verify_worker::verify_torrent(tr_torrent* tor, std::atomic<bool> const& stop_flag)
|
||||
void tr_verify_worker::verify_torrent(Mediator& verify_mediator, std::atomic<bool> const& abort_flag)
|
||||
{
|
||||
auto const begin = tr_time();
|
||||
verify_mediator.on_verify_started();
|
||||
|
||||
tr_sys_file_t fd = TR_BAD_SYS_FILE;
|
||||
uint64_t file_pos = 0;
|
||||
bool changed = false;
|
||||
bool had_piece = false;
|
||||
time_t last_slept_at = 0;
|
||||
uint32_t piece_pos = 0;
|
||||
tr_file_index_t file_index = 0;
|
||||
uint64_t file_pos = 0U;
|
||||
uint32_t piece_pos = 0U;
|
||||
tr_file_index_t file_index = 0U;
|
||||
tr_file_index_t prev_file_index = ~file_index;
|
||||
tr_piece_index_t piece = 0;
|
||||
auto buffer = std::vector<std::byte>(1024 * 256);
|
||||
tr_piece_index_t piece = 0U;
|
||||
auto buffer = std::vector<std::byte>(1024U * 256U);
|
||||
auto sha = tr_sha1::create();
|
||||
auto last_slept_at = current_time_secs();
|
||||
|
||||
tr_logAddDebugTor(tor, "verifying torrent...");
|
||||
|
||||
while (!stop_flag && piece < tor->piece_count())
|
||||
auto const& metainfo = verify_mediator.metainfo();
|
||||
while (!abort_flag && piece < metainfo.piece_count())
|
||||
{
|
||||
auto const file_length = tor->file_size(file_index);
|
||||
|
||||
/* if we're starting a new piece... */
|
||||
if (piece_pos == 0)
|
||||
{
|
||||
had_piece = tor->has_piece(piece);
|
||||
}
|
||||
auto const file_length = metainfo.file_size(file_index);
|
||||
|
||||
/* if we're starting a new file... */
|
||||
if (file_pos == 0 && fd == TR_BAD_SYS_FILE && file_index != prev_file_index)
|
||||
{
|
||||
auto const found = tor->find_file(file_index);
|
||||
fd = !found ? TR_BAD_SYS_FILE : tr_sys_file_open(found->filename(), TR_SYS_FILE_READ | TR_SYS_FILE_SEQUENTIAL, 0);
|
||||
auto const found = verify_mediator.find_file(file_index);
|
||||
fd = !found ? TR_BAD_SYS_FILE : tr_sys_file_open(found->c_str(), TR_SYS_FILE_READ | TR_SYS_FILE_SEQUENTIAL, 0);
|
||||
prev_file_index = file_index;
|
||||
}
|
||||
|
||||
/* figure out how much we can read this pass */
|
||||
uint64_t left_in_piece = tor->piece_size(piece) - piece_pos;
|
||||
uint64_t left_in_piece = metainfo.piece_size(piece) - piece_pos;
|
||||
uint64_t left_in_file = file_length - file_pos;
|
||||
uint64_t bytes_this_pass = std::min(left_in_file, left_in_piece);
|
||||
bytes_this_pass = std::min(bytes_this_pass, uint64_t(std::size(buffer)));
|
||||
|
@ -115,18 +86,12 @@ bool tr_verify_worker::verify_torrent(tr_torrent* tor, std::atomic<bool> const&
|
|||
/* if we're finishing a piece... */
|
||||
if (left_in_piece == 0)
|
||||
{
|
||||
if (auto const has_piece = sha->finish() == tor->piece_hash(piece); has_piece || had_piece)
|
||||
{
|
||||
tor->set_has_piece(piece, has_piece);
|
||||
changed |= has_piece != had_piece;
|
||||
}
|
||||
|
||||
tor->checked_pieces_.set(piece, true);
|
||||
tor->mark_changed();
|
||||
auto const has_piece = sha->finish() == metainfo.piece_hash(piece);
|
||||
verify_mediator.on_piece_checked(piece, has_piece);
|
||||
|
||||
/* sleeping even just a few msec per second goes a long
|
||||
* way towards reducing IO load... */
|
||||
if (auto const now = tr_time(); last_slept_at != now)
|
||||
if (auto const now = current_time_secs(); last_slept_at != now)
|
||||
{
|
||||
last_slept_at = now;
|
||||
std::this_thread::sleep_for(SleepPerSecondDuringVerify);
|
||||
|
@ -134,7 +99,6 @@ bool tr_verify_worker::verify_torrent(tr_torrent* tor, std::atomic<bool> const&
|
|||
|
||||
sha->clear();
|
||||
++piece;
|
||||
tor->set_verify_progress(piece / float(tor->piece_count()));
|
||||
piece_pos = 0;
|
||||
}
|
||||
|
||||
|
@ -158,17 +122,7 @@ bool tr_verify_worker::verify_torrent(tr_torrent* tor, std::atomic<bool> const&
|
|||
tr_sys_file_close(fd);
|
||||
}
|
||||
|
||||
/* stopwatch */
|
||||
time_t const end = tr_time();
|
||||
tr_logAddDebugTor(
|
||||
tor,
|
||||
fmt::format(
|
||||
"Verification is done. It took {} seconds to verify {} bytes ({} bytes per second)",
|
||||
end - begin,
|
||||
tor->total_size(),
|
||||
tor->total_size() / (1 + (end - begin))));
|
||||
|
||||
return changed;
|
||||
verify_mediator.on_verify_done(abort_flag);
|
||||
}
|
||||
|
||||
void tr_verify_worker::verify_thread_func()
|
||||
|
@ -176,7 +130,7 @@ void tr_verify_worker::verify_thread_func()
|
|||
for (;;)
|
||||
{
|
||||
{
|
||||
auto const lock = std::lock_guard(verify_mutex_);
|
||||
auto const lock = std::lock_guard{ verify_mutex_ };
|
||||
|
||||
if (stop_current_)
|
||||
{
|
||||
|
@ -191,39 +145,19 @@ void tr_verify_worker::verify_thread_func()
|
|||
return;
|
||||
}
|
||||
|
||||
auto const it = std::begin(todo_);
|
||||
current_node_ = *it;
|
||||
todo_.erase(it);
|
||||
current_node_ = std::move(todo_.extract(std::begin(todo_)).value());
|
||||
}
|
||||
|
||||
auto* const tor = current_node_->torrent;
|
||||
tr_logAddTraceTor(tor, "Verifying torrent");
|
||||
tor->set_verify_state(TR_VERIFY_NOW);
|
||||
auto const changed = verify_torrent(tor, stop_current_);
|
||||
tor->set_verify_state(TR_VERIFY_NONE);
|
||||
TR_ASSERT(tr_isTorrent(tor));
|
||||
|
||||
if (!stop_current_ && changed)
|
||||
{
|
||||
tor->set_dirty();
|
||||
}
|
||||
|
||||
call_callback(tor, stop_current_);
|
||||
verify_torrent(*current_node_->mediator_, stop_current_);
|
||||
}
|
||||
}
|
||||
|
||||
void tr_verify_worker::add(tr_torrent* tor)
|
||||
void tr_verify_worker::add(std::unique_ptr<Mediator> mediator, tr_priority_t priority)
|
||||
{
|
||||
TR_ASSERT(tr_isTorrent(tor));
|
||||
tr_logAddTraceTor(tor, "Queued for verification");
|
||||
auto const lock = std::lock_guard{ verify_mutex_ };
|
||||
|
||||
auto node = Node{};
|
||||
node.torrent = tor;
|
||||
node.current_size = tor->has_total();
|
||||
|
||||
auto const lock = std::lock_guard(verify_mutex_);
|
||||
tor->set_verify_state(TR_VERIFY_WAIT);
|
||||
todo_.insert(node);
|
||||
mediator->on_verify_queued();
|
||||
todo_.emplace(std::move(mediator), priority);
|
||||
|
||||
if (!verify_thread_id_)
|
||||
{
|
||||
|
@ -233,38 +167,30 @@ void tr_verify_worker::add(tr_torrent* tor)
|
|||
}
|
||||
}
|
||||
|
||||
void tr_verify_worker::remove(tr_torrent* tor)
|
||||
void tr_verify_worker::remove(tr_sha1_digest_t const& info_hash)
|
||||
{
|
||||
TR_ASSERT(tr_isTorrent(tor));
|
||||
|
||||
auto lock = std::unique_lock(verify_mutex_);
|
||||
|
||||
if (current_node_ && current_node_->torrent == tor)
|
||||
if (current_node_ && current_node_->matches(info_hash))
|
||||
{
|
||||
stop_current_ = true;
|
||||
stop_current_cv_.wait(lock, [this]() { return !stop_current_; });
|
||||
}
|
||||
else
|
||||
else if (auto const iter = std::find_if(
|
||||
std::begin(todo_),
|
||||
std::end(todo_),
|
||||
[&info_hash](auto const& node) { return node.matches(info_hash); });
|
||||
iter != std::end(todo_))
|
||||
{
|
||||
auto const iter = std::find_if(
|
||||
std::begin(todo_),
|
||||
std::end(todo_),
|
||||
[tor](auto const& task) { return tor == task.torrent; });
|
||||
|
||||
tor->set_verify_state(TR_VERIFY_NONE);
|
||||
|
||||
if (iter != std::end(todo_))
|
||||
{
|
||||
call_callback(tor, true);
|
||||
todo_.erase(iter);
|
||||
}
|
||||
iter->mediator_->on_verify_done(true /*aborted*/);
|
||||
todo_.erase(iter);
|
||||
}
|
||||
}
|
||||
|
||||
tr_verify_worker::~tr_verify_worker()
|
||||
{
|
||||
{
|
||||
auto const lock = std::lock_guard(verify_mutex_);
|
||||
auto const lock = std::lock_guard{ verify_mutex_ };
|
||||
stop_current_ = true;
|
||||
todo_.clear();
|
||||
}
|
||||
|
@ -274,3 +200,30 @@ tr_verify_worker::~tr_verify_worker()
|
|||
std::this_thread::sleep_for(20ms);
|
||||
}
|
||||
}
|
||||
|
||||
int tr_verify_worker::Node::compare(Node const& that) const noexcept
|
||||
{
|
||||
// prefer higher-priority torrents
|
||||
if (priority_ != that.priority_)
|
||||
{
|
||||
return priority_ > that.priority_ ? -1 : 1;
|
||||
}
|
||||
|
||||
// prefer smaller torrents, since they will verify faster
|
||||
auto const& metainfo = mediator_->metainfo();
|
||||
auto const& that_metainfo = that.mediator_->metainfo();
|
||||
if (metainfo.total_size() != that_metainfo.total_size())
|
||||
{
|
||||
return metainfo.total_size() < that_metainfo.total_size() ? -1 : 1;
|
||||
}
|
||||
|
||||
// uniqueness check
|
||||
auto const& this_hash = metainfo.info_hash();
|
||||
auto const& that_hash = that_metainfo.info_hash();
|
||||
if (this_hash != that_hash)
|
||||
{
|
||||
return this_hash < that_hash ? -1 : 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -14,56 +14,66 @@
|
|||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
|
||||
struct tr_session;
|
||||
struct tr_torrent;
|
||||
#include "libtransmission/torrent-metainfo.h"
|
||||
|
||||
class tr_verify_worker
|
||||
{
|
||||
public:
|
||||
using callback_func = std::function<void(tr_torrent*, bool aborted)>;
|
||||
class Mediator
|
||||
{
|
||||
public:
|
||||
virtual ~Mediator() = default;
|
||||
|
||||
[[nodiscard]] virtual tr_torrent_metainfo const& metainfo() const = 0;
|
||||
[[nodiscard]] virtual std::optional<std::string> find_file(tr_file_index_t file_index) const = 0;
|
||||
|
||||
virtual void on_verify_queued() = 0;
|
||||
virtual void on_verify_started() = 0;
|
||||
virtual void on_piece_checked(tr_piece_index_t piece, bool has_piece) = 0;
|
||||
virtual void on_verify_done(bool aborted) = 0;
|
||||
};
|
||||
|
||||
~tr_verify_worker();
|
||||
|
||||
void add_callback(callback_func callback)
|
||||
{
|
||||
callbacks_.emplace_back(std::move(callback));
|
||||
}
|
||||
void add(std::unique_ptr<Mediator> mediator, tr_priority_t priority);
|
||||
|
||||
void add(tr_torrent* tor);
|
||||
|
||||
void remove(tr_torrent* tor);
|
||||
void remove(tr_sha1_digest_t const& info_hash);
|
||||
|
||||
private:
|
||||
struct Node
|
||||
{
|
||||
tr_torrent* torrent = nullptr;
|
||||
uint64_t current_size = 0;
|
||||
Node(std::unique_ptr<Mediator> mediator, tr_priority_t priority) noexcept
|
||||
: mediator_{ std::move(mediator) }
|
||||
, priority_{ priority }
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] int compare(Node const& that) const;
|
||||
[[nodiscard]] int compare(Node const& that) const noexcept; // <=>
|
||||
|
||||
[[nodiscard]] bool operator<(Node const& that) const
|
||||
[[nodiscard]] auto operator<(Node const& that) const noexcept
|
||||
{
|
||||
return compare(that) < 0;
|
||||
}
|
||||
|
||||
[[nodiscard]] bool matches(tr_sha1_digest_t const& info_hash) const noexcept
|
||||
{
|
||||
return mediator_->metainfo().info_hash() == info_hash;
|
||||
}
|
||||
|
||||
std::unique_ptr<Mediator> mediator_;
|
||||
tr_priority_t priority_;
|
||||
};
|
||||
|
||||
void call_callback(tr_torrent* tor, bool aborted) const
|
||||
{
|
||||
for (auto const& callback : callbacks_)
|
||||
{
|
||||
callback(tor, aborted);
|
||||
}
|
||||
}
|
||||
static void verify_torrent(Mediator& verify_mediator, std::atomic<bool> const& abort_flag);
|
||||
|
||||
void verify_thread_func();
|
||||
[[nodiscard]] static bool verify_torrent(tr_torrent* tor, std::atomic<bool> const& stop_flag);
|
||||
|
||||
std::list<callback_func> callbacks_;
|
||||
std::mutex verify_mutex_;
|
||||
|
||||
std::set<Node> todo_;
|
||||
|
|
|
@ -396,8 +396,17 @@ protected:
|
|||
{
|
||||
auto verified_lock = std::unique_lock(verified_mutex_);
|
||||
auto const n_previously_verified = std::size(verified_);
|
||||
auto* const tor = tr_torrentNew(ctor, nullptr);
|
||||
|
||||
tr_ctorSetVerifyDoneCallback(
|
||||
ctor,
|
||||
[this](tr_torrent* const tor)
|
||||
{
|
||||
auto verified_lock = std::lock_guard{ verified_mutex_ };
|
||||
verified_.emplace_back(tor);
|
||||
verified_cv_.notify_one();
|
||||
});
|
||||
|
||||
auto* const tor = tr_torrentNew(ctor, nullptr);
|
||||
auto const stop_waiting = [this, tor, n_previously_verified]()
|
||||
{
|
||||
return std::size(verified_) > n_previously_verified && verified_.back() == tor;
|
||||
|
@ -514,15 +523,7 @@ protected:
|
|||
|
||||
init_mgr_ = tr_lib_init();
|
||||
|
||||
auto callback = [this](tr_torrent* tor, bool /*aborted*/)
|
||||
{
|
||||
auto verified_lock = std::scoped_lock(verified_mutex_);
|
||||
verified_.emplace_back(tor);
|
||||
verified_cv_.notify_one();
|
||||
};
|
||||
|
||||
session_ = sessionInit(*settings());
|
||||
session_->verifier_->add_callback(callback);
|
||||
}
|
||||
|
||||
virtual void TearDown() override
|
||||
|
|
Loading…
Reference in New Issue