diff --git a/libtransmission/peer-mgr.cc b/libtransmission/peer-mgr.cc index 9cf922aaa..3f19c4a92 100644 --- a/libtransmission/peer-mgr.cc +++ b/libtransmission/peer-mgr.cc @@ -615,7 +615,7 @@ public: ActiveRequests active_requests; // depends-on: active_requests - std::vector> webseeds; + std::vector> webseeds; // depends-on: active_requests Peers peers; @@ -645,7 +645,7 @@ private: webseeds.reserve(n); for (size_t i = 0; i < n; ++i) { - webseeds.emplace_back(tr_webseedNew(*tor, tor->webseed(i), &tr_swarm::peer_callback_webseed, this)); + webseeds.emplace_back(tr_webseed::create(*tor, tor->webseed(i), &tr_swarm::peer_callback_webseed, this)); } webseeds.shrink_to_fit(); @@ -1719,7 +1719,7 @@ tr_webseed_view tr_peerMgrWebseed(tr_torrent const* tor, size_t i) size_t const n = std::size(tor->swarm->webseeds); TR_ASSERT(i < n); - return i >= n ? tr_webseed_view{} : tr_webseedView(tor->swarm->webseeds[i].get()); + return i >= n ? tr_webseed_view{} : tor->swarm->webseeds[i]->get_view(); } namespace diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index d6ab51a43..9c1a133de 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -45,40 +45,45 @@ using namespace libtransmission::Values; namespace { -class tr_webseed; - -void on_idle(tr_webseed* w); +class tr_webseed_impl; class tr_webseed_task { -private: - libtransmission::evhelpers::evbuffer_unique_ptr const content_{ evbuffer_new() }; - public: - tr_webseed_task(tr_torrent* tor, tr_webseed* webseed_in, tr_block_span_t blocks_in) - : webseed{ webseed_in } - , session{ tor->session } - , blocks{ blocks_in } - , end_byte{ tor->block_loc(blocks.end - 1).byte + tor->block_size(blocks.end - 1) } - , loc{ tor->block_loc(blocks.begin) } + tr_webseed_task(tr_torrent const& tor, tr_webseed_impl* webseed_in, tr_block_span_t blocks_in) + : blocks{ blocks_in } + , webseed_{ webseed_in } + , session_{ tor.session } + , end_byte_{ tor.block_loc(blocks.end - 1).byte + tor.block_size(blocks.end - 1) } + , loc_{ tor.block_loc(blocks.begin) } { + evbuffer_add_cb(content_.get(), on_buffer_got_data, this); } - tr_webseed* const webseed; - [[nodiscard]] auto* content() const { return content_.get(); } - tr_session* const session; - tr_block_span_t const blocks; - uint64_t const end_byte; - - // the current position in the task; i.e., the next block to save - tr_block_info::Location loc; + void request_next_chunk(); bool dead = false; + tr_block_span_t const blocks; + +private: + void use_fetched_blocks(); + + static void on_partial_data_fetched(tr_web::FetchResponse const& web_response); + static void on_buffer_got_data(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtask); + + tr_webseed_impl* const webseed_; + tr_session* const session_; + uint64_t const end_byte_; + + // the current position in the task; i.e., the next block to save + tr_block_info::Location loc_; + + libtransmission::evhelpers::evbuffer_unique_ptr const content_{ evbuffer_new() }; }; /** @@ -93,37 +98,37 @@ public: class ConnectionLimiter { public: - constexpr void taskStarted() noexcept + constexpr void task_started() noexcept { ++n_tasks; } - void taskFinished(bool success) + void task_finished(bool success) { if (!success) { - taskFailed(); + task_failed(); } TR_ASSERT(n_tasks > 0); --n_tasks; } - constexpr void gotData() noexcept + constexpr void got_data() noexcept { TR_ASSERT(n_tasks > 0); n_consecutive_failures = 0; paused_until = 0; } - [[nodiscard]] size_t slotsAvailable() const noexcept + [[nodiscard]] size_t slots_available() const noexcept { - if (isPaused()) + if (is_paused()) { return 0; } - auto const max = maxConnections(); + auto const max = max_connections(); if (n_tasks >= max) { return 0; @@ -133,17 +138,17 @@ public: } private: - [[nodiscard]] bool isPaused() const noexcept + [[nodiscard]] bool is_paused() const noexcept { return paused_until > tr_time(); } - [[nodiscard]] constexpr size_t maxConnections() const noexcept + [[nodiscard]] constexpr size_t max_connections() const noexcept { return n_consecutive_failures > 0 ? 1 : MaxConnections; } - void taskFailed() + void task_failed() { TR_ASSERT(n_tasks > 0); @@ -153,19 +158,16 @@ private: } } - static time_t constexpr TimeoutIntervalSecs = 120; - static size_t constexpr MaxConnections = 4; - static size_t constexpr MaxConsecutiveFailures = MaxConnections; + static auto constexpr TimeoutIntervalSecs = time_t{ 120 }; + static auto constexpr MaxConnections = size_t{ 4 }; + static auto constexpr MaxConsecutiveFailures = MaxConnections; size_t n_tasks = 0; size_t n_consecutive_failures = 0; time_t paused_until = 0; }; -void task_request_next_chunk(tr_webseed_task* task); -void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtask); - -class tr_webseed final : public tr_peer +class tr_webseed_impl final : public tr_webseed { public: struct RequestLimit @@ -178,42 +180,44 @@ public: size_t max_blocks = 0; }; - tr_webseed(tr_torrent& tor, std::string_view url, tr_peer_callback_webseed callback_in, void* callback_data_in) - : tr_peer{ tor } - , torrent_id{ tor.id() } + tr_webseed_impl(tr_torrent& tor_in, std::string_view url, tr_peer_callback_webseed callback_in, void* callback_data_in) + : tr_webseed{ tor_in } + , tor{ tor_in } , base_url{ url } - , callback{ callback_in } - , callback_data{ callback_data_in } - , idle_timer_{ session->timerMaker().create([this]() { on_idle(this); }) } - , have_{ tor.piece_count() } - , bandwidth_{ &tor.bandwidth() } + , idle_timer_{ session->timerMaker().create([this]() { on_idle(); }) } + , have_{ tor_in.piece_count() } + , bandwidth_{ &tor_in.bandwidth() } + , callback_{ callback_in } + , callback_data_{ callback_data_in } { have_.set_has_all(); idle_timer_->start_repeating(IdleTimerInterval); } - tr_webseed(tr_webseed&&) = delete; - tr_webseed(tr_webseed const&) = delete; - tr_webseed& operator=(tr_webseed&&) = delete; - tr_webseed& operator=(tr_webseed const&) = delete; + tr_webseed_impl(tr_webseed_impl&&) = delete; + tr_webseed_impl(tr_webseed_impl const&) = delete; + tr_webseed_impl& operator=(tr_webseed_impl&&) = delete; + tr_webseed_impl& operator=(tr_webseed_impl const&) = delete; - ~tr_webseed() override + ~tr_webseed_impl() override { // flag all the pending tasks as dead std::for_each(std::begin(tasks), std::end(tasks), [](auto* task) { task->dead = true; }); tasks.clear(); } - [[nodiscard]] tr_torrent* getTorrent() const - { - return tr_torrentFindFromId(session, torrent_id); - } - [[nodiscard]] Speed get_piece_speed(uint64_t now, tr_direction dir) const override { return dir == TR_DOWN ? bandwidth_.get_piece_speed(now, dir) : Speed{}; } + [[nodiscard]] tr_webseed_view get_view() const override + { + auto const is_downloading = !std::empty(tasks); + auto const speed = get_piece_speed(tr_time_msec(), TR_DOWN); + return { base_url.c_str(), is_downloading, speed.base_quantity() }; + } + [[nodiscard]] TR_CONSTEXPR20 size_t active_req_count(tr_direction dir) const noexcept override { if (dir == TR_CLIENT_TO_PEER) // blocks we've requested @@ -231,7 +235,7 @@ public: [[nodiscard]] std::string display_name() const override { - if (auto const parsed = tr_urlParse(base_url); parsed) + if (auto const parsed = tr_urlParse(base_url)) { return fmt::format("{:s}:{:d}", parsed->host, parsed->port); } @@ -244,26 +248,24 @@ public: return have_; } - void gotPieceData(uint32_t n_bytes) + void got_piece_data(uint32_t n_bytes) { bandwidth_.notify_bandwidth_consumed(TR_DOWN, n_bytes, true, tr_time_msec()); publish(tr_peer_event::GotPieceData(n_bytes)); - connection_limiter.gotData(); + connection_limiter.got_data(); } - void publishRejection(tr_block_span_t block_span) + void publish_rejection(tr_block_span_t block_span) { - auto const* const tor = getTorrent(); for (auto block = block_span.begin; block < block_span.end; ++block) { - publish(tr_peer_event::GotRejected(tor->block_info(), block)); + publish(tr_peer_event::GotRejected(tor.block_info(), block)); } } void request_blocks(tr_block_span_t const* block_spans, size_t n_spans) override { - auto* const tor = getTorrent(); - if (tor == nullptr || !tor->is_running() || tor->is_done()) + if (!tor.is_running() || tor.is_done()) { return; } @@ -271,23 +273,41 @@ public: for (auto const *span = block_spans, *end = span + n_spans; span != end; ++span) { auto* const task = new tr_webseed_task{ tor, this, *span }; - evbuffer_add_cb(task->content(), onBufferGotData, task); tasks.insert(task); - task_request_next_chunk(task); + task->request_next_chunk(); - tr_peerMgrClientSentRequests(tor, this, *span); + tr_peerMgrClientSentRequests(&tor, this, *span); } } + void on_idle() + { + auto const [max_spans, max_blocks] = max_available_reqs(); + if (max_spans == 0 || max_blocks == 0) + { + return; + } + + // Prefer to request large, contiguous chunks from webseeds. + // The actual value of '64' is arbitrary here; we could probably + // be smarter about this. + auto spans = tr_peerMgrGetNextRequests(&tor, this, max_blocks); + if (std::size(spans) > max_spans) + { + spans.resize(max_spans); + } + request_blocks(std::data(spans), std::size(spans)); + } + [[nodiscard]] RequestLimit max_available_reqs() const noexcept { - auto const n_slots = connection_limiter.slotsAvailable(); + auto const n_slots = connection_limiter.slots_available(); if (n_slots == 0) { return {}; } - if (auto const* const tor = getTorrent(); tor == nullptr || !tor->is_running() || tor->is_done()) + if (!tor.is_running() || tor.is_done()) { return {}; } @@ -301,16 +321,14 @@ public: void publish(tr_peer_event const& peer_event) { - if (callback != nullptr) + if (callback_ != nullptr) { - (*callback)(this, peer_event, callback_data); + (*callback_)(this, peer_event, callback_data_); } } - tr_torrent_id_t const torrent_id; + tr_torrent& tor; std::string const base_url; - tr_peer_callback_webseed const callback; - void* const callback_data; ConnectionLimiter connection_limiter; std::set tasks; @@ -323,92 +341,57 @@ private: tr_bitfield have_; tr_bandwidth bandwidth_; + + tr_peer_callback_webseed const callback_; + void* const callback_data_; }; // --- -struct write_block_data +void tr_webseed_task::use_fetched_blocks() { -private: - libtransmission::evhelpers::evbuffer_unique_ptr const content_{ evbuffer_new() }; + auto const lock = session_->unique_lock(); -public: - write_block_data( - tr_session* session, - tr_torrent_id_t tor_id, - tr_block_index_t block, - std::unique_ptr data, - tr_webseed* webseed) - : session_{ session } - , tor_id_{ tor_id } - , block_{ block } - , data_{ std::move(data) } - , webseed_{ webseed } + auto const& tor = webseed_->tor; + + for (auto* const buf = content();;) { - } - - void write_block_func() - { - if (auto const* const tor = tr_torrentFindFromId(session_, tor_id_); tor != nullptr) - { - session_->cache->write_block(tor_id_, block_, std::move(data_)); - webseed_->publish(tr_peer_event::GotBlock(tor->block_info(), block_)); - } - - delete this; - } - -private: - tr_session* const session_; - tr_torrent_id_t const tor_id_; - tr_block_index_t const block_; - std::unique_ptr data_; - tr_webseed* const webseed_; -}; - -void useFetchedBlocks(tr_webseed_task* task) -{ - auto* const session = task->session; - auto const lock = session->unique_lock(); - - auto* const webseed = task->webseed; - auto const* const tor = webseed->getTorrent(); - if (tor == nullptr) - { - return; - } - - auto* const buf = task->content(); - for (;;) - { - auto const block_size = tor->block_size(task->loc.block); + auto const block_size = tor.block_size(loc_.block); if (evbuffer_get_length(buf) < block_size) { break; } - if (tor->has_block(task->loc.block)) + if (tor.has_block(loc_.block)) { evbuffer_drain(buf, block_size); } else { - auto block_buf = std::make_unique(block_size); - evbuffer_remove(task->content(), std::data(*block_buf), std::size(*block_buf)); - auto* const data = new write_block_data{ session, tor->id(), task->loc.block, std::move(block_buf), webseed }; - session->run_in_session_thread(&write_block_data::write_block_func, data); + auto block_buf = new Cache::BlockData(block_size); + evbuffer_remove(buf, std::data(*block_buf), std::size(*block_buf)); + session_->run_in_session_thread( + [session = session_, tor_id = tor.id(), block = loc_.block, block_buf, webseed = webseed_]() + { + auto data = std::unique_ptr{ block_buf }; + if (auto const* const torrent = tr_torrentFindFromId(session, tor_id); torrent != nullptr) + { + session->cache->write_block(tor_id, block, std::move(data)); + webseed->publish(tr_peer_event::GotBlock(torrent->block_info(), block)); + } + }); } - task->loc = tor->byte_loc(task->loc.byte + block_size); + loc_ = tor.byte_loc(loc_.byte + block_size); - TR_ASSERT(task->loc.byte <= task->end_byte); - TR_ASSERT(task->loc.byte == task->end_byte || task->loc.block_offset == 0); + TR_ASSERT(loc_.byte <= end_byte_); + TR_ASSERT(loc_.byte == end_byte_ || loc_.block_offset == 0); } } // --- -void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtask) +void tr_webseed_task::on_buffer_got_data(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtask) { size_t const n_added = info->n_added; auto* const task = static_cast(vtask); @@ -417,33 +400,14 @@ void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtas return; } - auto const lock = task->session->unique_lock(); - task->webseed->gotPieceData(n_added); + auto const lock = task->session_->unique_lock(); + task->webseed_->got_piece_data(n_added); } -void on_idle(tr_webseed* webseed) -{ - auto const [max_spans, max_blocks] = webseed->max_available_reqs(); - if (max_spans == 0 || max_blocks == 0) - { - return; - } - - // Prefer to request large, contiguous chunks from webseeds. - // The actual value of '64' is arbitrary here; we could probably - // be smarter about this. - auto spans = tr_peerMgrGetNextRequests(webseed->getTorrent(), webseed, max_blocks); - if (std::size(spans) > max_spans) - { - spans.resize(max_spans); - } - webseed->request_blocks(std::data(spans), std::size(spans)); -} - -void onPartialDataFetched(tr_web::FetchResponse const& web_response) +void tr_webseed_task::on_partial_data_fetched(tr_web::FetchResponse const& web_response) { auto const& [status, body, primary_ip, did_connect, did_timeout, vtask] = web_response; - bool const success = status == 206; + auto const success = status == 206; auto* const task = static_cast(vtask); @@ -453,43 +417,38 @@ void onPartialDataFetched(tr_web::FetchResponse const& web_response) return; } - auto* const webseed = task->webseed; - webseed->connection_limiter.taskFinished(success); - - if (auto const* const tor = webseed->getTorrent(); tor == nullptr) - { - return; - } + auto* const webseed = task->webseed_; + webseed->connection_limiter.task_finished(success); if (!success) { - webseed->publishRejection({ task->loc.block, task->blocks.end }); + webseed->publish_rejection({ task->loc_.block, task->blocks.end }); webseed->tasks.erase(task); delete task; return; } - useFetchedBlocks(task); + task->use_fetched_blocks(); - if (task->loc.byte < task->end_byte) + if (task->loc_.byte < task->end_byte_) { // Request finished successfully but there's still data missing. // That means we've reached the end of a file and need to request // the next one - task_request_next_chunk(task); + task->request_next_chunk(); return; } TR_ASSERT(evbuffer_get_length(task->content()) == 0); - TR_ASSERT(task->loc.byte == task->end_byte); + TR_ASSERT(task->loc_.byte == task->end_byte_); webseed->tasks.erase(task); delete task; - on_idle(webseed); + webseed->on_idle(); } template -void makeUrl(tr_webseed const* const webseed, std::string_view name, OutputIt out) +void makeUrl(tr_webseed_impl const* const webseed, std::string_view name, OutputIt out) { auto const& url = webseed->base_url; @@ -501,52 +460,38 @@ void makeUrl(tr_webseed const* const webseed, std::string_view name, OutputIt ou } } -void task_request_next_chunk(tr_webseed_task* task) +void tr_webseed_task::request_next_chunk() { - auto* const webseed = task->webseed; - auto const* const tor = webseed->getTorrent(); - if (tor == nullptr) - { - return; - } + auto const& tor = webseed_->tor; - auto const loc = tor->byte_loc(task->loc.byte + evbuffer_get_length(task->content())); + auto const downloaded_loc = tor.byte_loc(loc_.byte + evbuffer_get_length(content())); - auto const [file_index, file_offset] = tor->file_offset(loc); - auto const left_in_file = tor->file_size(file_index) - file_offset; - auto const left_in_task = task->end_byte - loc.byte; + auto const [file_index, file_offset] = tor.file_offset(downloaded_loc); + auto const left_in_file = tor.file_size(file_index) - file_offset; + auto const left_in_task = end_byte_ - downloaded_loc.byte; auto const this_chunk = std::min(left_in_file, left_in_task); TR_ASSERT(this_chunk > 0U); - webseed->connection_limiter.taskStarted(); + webseed_->connection_limiter.task_started(); auto url = tr_urlbuf{}; - makeUrl(webseed, tor->file_subpath(file_index), std::back_inserter(url)); - auto options = tr_web::FetchOptions{ url.sv(), onPartialDataFetched, task }; + makeUrl(webseed_, tor.file_subpath(file_index), std::back_inserter(url)); + auto options = tr_web::FetchOptions{ url.sv(), on_partial_data_fetched, this }; options.range = fmt::format("{:d}-{:d}", file_offset, file_offset + this_chunk - 1); - options.speed_limit_tag = tor->id(); - options.buffer = task->content(); - tor->session->fetch(std::move(options)); + options.speed_limit_tag = tor.id(); + options.buffer = content(); + tor.session->fetch(std::move(options)); } } // namespace // --- -tr_peer* tr_webseedNew(tr_torrent& torrent, std::string_view url, tr_peer_callback_webseed callback, void* callback_data) +std::unique_ptr tr_webseed::create( + tr_torrent& torrent, + std::string_view url, + tr_peer_callback_webseed callback, + void* callback_data) { - return new tr_webseed{ torrent, url, callback, callback_data }; -} - -tr_webseed_view tr_webseedView(tr_peer const* peer) -{ - auto const* const webseed = dynamic_cast(peer); - if (webseed == nullptr) - { - return {}; - } - - auto const is_downloading = !std::empty(webseed->tasks); - auto const speed = peer->get_piece_speed(tr_time_msec(), TR_DOWN); - return { webseed->base_url.c_str(), is_downloading, speed.base_quantity() }; + return std::make_unique(torrent, url, callback, callback_data); } diff --git a/libtransmission/webseed.h b/libtransmission/webseed.h index 3ac1bf434..8fef98ed4 100644 --- a/libtransmission/webseed.h +++ b/libtransmission/webseed.h @@ -9,6 +9,7 @@ #error only libtransmission should #include this header. #endif +#include #include #include "libtransmission/transmission.h" @@ -17,6 +18,20 @@ using tr_peer_callback_webseed = tr_peer_callback_generic; -tr_peer* tr_webseedNew(tr_torrent& torrent, std::string_view, tr_peer_callback_webseed callback, void* callback_data); +class tr_webseed : public tr_peer +{ +protected: + explicit tr_webseed(tr_torrent& tor_in) + : tr_peer{ tor_in } + { + } -tr_webseed_view tr_webseedView(tr_peer const* peer); +public: + [[nodiscard]] static std::unique_ptr create( + tr_torrent& torrent, + std::string_view, + tr_peer_callback_webseed callback, + void* callback_data); + + [[nodiscard]] virtual tr_webseed_view get_view() const = 0; +};