From 723872431885010396b1b192dc18535819c4c76d Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Wed, 23 Feb 2022 07:38:18 -0600 Subject: [PATCH] refactor: webseed (#2689) * refactor: reduce nested conditional webseed logic * refactor: use tr_block_info::Location in webseed * request larger chunks at a time from webseeds * fix: CURLOPT_MAXREDIRS to safeguard against loops Discussion at https://trac.transmissionbt.com/ticket/6110 . Reported by @x190 and @cfpp2p a few years back. --- libtransmission/block-info.h | 8 + libtransmission/cache.cc | 6 +- libtransmission/peer-msgs.cc | 5 +- libtransmission/torrent.cc | 16 -- libtransmission/torrent.h | 7 - libtransmission/web.cc | 2 + libtransmission/webseed.cc | 395 +++++++++++++++++------------------ 7 files changed, 206 insertions(+), 233 deletions(-) diff --git a/libtransmission/block-info.h b/libtransmission/block-info.h index 85589caed..6802a5503 100644 --- a/libtransmission/block-info.h +++ b/libtransmission/block-info.h @@ -7,6 +7,8 @@ #include "transmission.h" +#include "tr-assert.h" + struct tr_block_info { uint64_t total_size = 0; @@ -99,6 +101,8 @@ struct tr_block_info // Location of the first byte in `block`. [[nodiscard]] Location constexpr blockLoc(tr_block_index_t block) const { + TR_ASSERT(block < n_blocks); + return byteLoc(uint64_t{ block } * blockSize()); } @@ -116,6 +120,8 @@ struct tr_block_info // Location of the first byte (+ optional offset and length) in `piece` [[nodiscard]] Location constexpr pieceLoc(tr_piece_index_t piece, uint32_t offset = 0, uint32_t length = 0) const { + TR_ASSERT(piece < n_pieces); + return byteLoc(uint64_t{ piece } * pieceSize() + offset + length); } @@ -133,6 +139,8 @@ struct tr_block_info // Location of the torrent's nth byte [[nodiscard]] Location constexpr byteLoc(uint64_t byte) const { + TR_ASSERT(byte <= total_size); + if (!isInitialized()) { return {}; diff --git a/libtransmission/cache.cc b/libtransmission/cache.cc index ac2f6b022..aef389a7d 100644 --- a/libtransmission/cache.cc +++ b/libtransmission/cache.cc @@ -315,9 +315,11 @@ int tr_cacheWriteBlock( struct evbuffer* writeme) { TR_ASSERT(tr_amInEventThread(torrent->session)); + TR_ASSERT(loc.block_offset == 0); + TR_ASSERT(torrent->blockSize(loc.block) == length); + TR_ASSERT(torrent->blockSize(loc.block) <= evbuffer_get_length(writeme)); - struct cache_block* cb = findBlock(cache, torrent, loc); - + auto* cb = findBlock(cache, torrent, loc); if (cb == nullptr) { cb = tr_new(struct cache_block, 1); diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index 95402cdd3..8e8322960 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -167,9 +167,8 @@ struct peer_request static peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block) { - auto ret = peer_request{}; - tr_torrentGetBlockLocation(tor, block, &ret.index, &ret.offset, &ret.length); - return ret; + auto const loc = tor->blockLoc(block); + return peer_request{ loc.piece, loc.piece_offset, tor->blockSize(block) }; } /** diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc index ff70169b8..eec414b52 100644 --- a/libtransmission/torrent.cc +++ b/libtransmission/torrent.cc @@ -1939,22 +1939,6 @@ uint16_t tr_torrentGetPeerLimit(tr_torrent const* tor) **** ***/ -void tr_torrentGetBlockLocation( - tr_torrent const* tor, - tr_block_index_t block, - tr_piece_index_t* piece, - uint32_t* offset, - uint32_t* length) -{ - uint64_t pos = block; - pos *= tor->blockSize(); - *piece = pos / tor->pieceSize(); - uint64_t piece_begin = tor->pieceSize(); - piece_begin *= *piece; - *offset = pos - piece_begin; - *length = tor->blockSize(block); -} - bool tr_torrentReqIsValid(tr_torrent const* tor, tr_piece_index_t index, uint32_t offset, uint32_t length) { TR_ASSERT(tr_isTorrent(tor)); diff --git a/libtransmission/torrent.h b/libtransmission/torrent.h index 48f8b26c7..740f119eb 100644 --- a/libtransmission/torrent.h +++ b/libtransmission/torrent.h @@ -73,13 +73,6 @@ tr_torrent* tr_torrentFindFromObfuscatedHash(tr_session* session, tr_sha1_digest bool tr_torrentReqIsValid(tr_torrent const* tor, tr_piece_index_t index, uint32_t offset, uint32_t length); -void tr_torrentGetBlockLocation( - tr_torrent const* tor, - tr_block_index_t block, - tr_piece_index_t* piece, - uint32_t* offset, - uint32_t* length); - tr_block_span_t tr_torGetFileBlockSpan(tr_torrent const* tor, tr_file_index_t file); void tr_torrentCheckSeedLimit(tr_torrent* tor); diff --git a/libtransmission/web.cc b/libtransmission/web.cc index 176d81fd2..5aca7da61 100644 --- a/libtransmission/web.cc +++ b/libtransmission/web.cc @@ -240,6 +240,7 @@ private: static auto constexpr BandwidthPauseMsec = long{ 500 }; static auto constexpr DnsCacheTimeoutSecs = long{ 60 * 60 }; + static auto constexpr MaxRedirects = long{ 10 }; bool const curl_verbose = tr_env_key_exists("TR_CURL_VERBOSE"); bool const curl_ssl_verify = !tr_env_key_exists("TR_CURL_SSL_NO_VERIFY"); @@ -364,6 +365,7 @@ private: (void)curl_easy_setopt(e, CURLOPT_VERBOSE, impl->curl_verbose ? 1L : 0L); (void)curl_easy_setopt(e, CURLOPT_WRITEDATA, task); (void)curl_easy_setopt(e, CURLOPT_WRITEFUNCTION, onDataReceived); + (void)curl_easy_setopt(e, CURLOPT_MAXREDIRS, MaxRedirects); if (auto const addrstr = impl->mediator.publicAddress(); addrstr) { diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index 9d8c27b80..e6ea241a0 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -28,7 +28,7 @@ using namespace std::literals; namespace { -struct tr_webseed; +class tr_webseed; void on_idle(tr_webseed* w); @@ -38,14 +38,12 @@ private: std::shared_ptr const content_{ evbuffer_new(), evbuffer_free }; public: - tr_webseed_task(tr_torrent* tor, tr_webseed* webseed_in, tr_block_span_t span) + tr_webseed_task(tr_torrent* tor, tr_webseed* webseed_in, tr_block_span_t blocks_in) : webseed{ webseed_in } , session{ tor->session } - , block{ span.begin } // TODO(ckerr): just own the loc - , piece_index{ tor->blockLoc(this->block).piece } - , piece_offset{ tor->blockLoc(this->block).piece_offset } - , block_size{ tor->blockSize() } - , length{ (span.end - 1 - span.begin) * tor->blockSize() + tor->blockSize(span.end - 1) } + , blocks{ blocks_in } + , end_byte{ tor->blockLoc(blocks.end - 1).byte + tor->blockSize(blocks.end - 1) } + , loc{ tor->blockLoc(blocks.begin) } { } @@ -57,14 +55,13 @@ public: } tr_session* const session; - tr_block_index_t const block; - tr_piece_index_t const piece_index; - uint32_t const piece_offset; - uint32_t const block_size; - uint32_t const length; + tr_block_span_t const blocks; + uint64_t const end_byte; + + // the current position in the task; i.e., the next block to save + tr_block_info::Location loc; bool dead = false; - tr_block_index_t blocks_done = 0; }; /** @@ -148,7 +145,7 @@ private: time_t paused_until = 0; }; -struct tr_webseed : public tr_peer +class tr_webseed : public tr_peer { public: tr_webseed(struct tr_torrent* tor, std::string_view url, tr_peer_callback callback_in, void* callback_data_in) @@ -174,7 +171,12 @@ public: tasks.clear(); } - bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const override + [[nodiscard]] tr_torrent* getTorrent() const + { + return tr_torrentFindFromId(session, torrent_id); + } + + [[nodiscard]] bool is_transferring_pieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const override { unsigned int Bps = 0; bool is_active = false; @@ -193,6 +195,40 @@ public: return is_active; } + void gotPieceData(uint32_t n_bytes) + { + bandwidth.notifyBandwidthConsumed(TR_DOWN, n_bytes, true, tr_time_msec()); + publishClientGotPieceData(n_bytes); + connection_limiter.gotData(); + } + + void publishRejection(tr_block_span_t block_span) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_REJ; + + for (auto block = block_span.begin; block < block_span.end; ++block) + { + auto const loc = getTorrent()->blockLoc(block); + e.pieceIndex = loc.piece; + e.offset = loc.piece_offset; + publish(&e); + } + } + + void publishGotBlock(tr_torrent const* tor, tr_block_info::Location loc) + { + TR_ASSERT(loc.block_offset == 0); + TR_ASSERT(loc.block < tor->blockCount()); + + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_BLOCK; + e.pieceIndex = loc.piece; + e.offset = loc.piece_offset; + e.length = tor->blockSize(loc.block); + publish(&e); + } + int const torrent_id; std::string const base_url; tr_peer_callback const callback; @@ -203,6 +239,22 @@ public: std::set tasks; private: + void publish(tr_peer_event* event) + { + if (callback != nullptr) + { + (*callback)(this, event, callback_data); + } + } + + void publishClientGotPieceData(uint32_t length) + { + auto e = tr_peer_event{}; + e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; + e.length = length; + publish(&e); + } + void startTimer() { tr_timerAddMsec(pulse_timer.get(), IdleTimerMsec); @@ -223,73 +275,19 @@ private: **** ***/ -void publish(tr_webseed* w, tr_peer_event* e) -{ - if (w->callback != nullptr) - { - (*w->callback)(w, e, w->callback_data); - } -} - -void fire_client_got_rejs(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_REJ; - tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length); - - for (tr_block_index_t i = 1; i <= count; i++) - { - if (i == count) - { - e.length = tor->blockSize(block + count - 1); - } - - publish(w, &e); - e.offset += e.length; - } -} - -void fire_client_got_blocks(tr_torrent* tor, tr_webseed* w, tr_block_index_t block, tr_block_index_t count) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_BLOCK; - tr_torrentGetBlockLocation(tor, block, &e.pieceIndex, &e.offset, &e.length); - - for (tr_block_index_t i = 1; i <= count; i++) - { - if (i == count) - { - e.length = tor->blockSize(block + count - 1); - } - - publish(w, &e); - e.offset += e.length; - } -} - -void fire_client_got_piece_data(tr_webseed* w, uint32_t length) -{ - auto e = tr_peer_event{}; - e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA; - e.length = length; - publish(w, &e); -} - -/*** -**** -***/ - struct write_block_data { private: std::shared_ptr const content_{ evbuffer_new(), evbuffer_free }; public: - write_block_data(tr_session* session_in, int torrent_id_in, tr_webseed* webseed_in) + write_block_data(tr_session* session_in, int torrent_id_in, tr_webseed* webseed_in, tr_block_info::Location loc_in) : session{ session_in } , torrent_id{ torrent_id_in } , webseed{ webseed_in } + , loc{ loc_in } { + TR_ASSERT(loc.block_offset == 0); } [[nodiscard]] auto* content() const @@ -297,106 +295,121 @@ public: return content_.get(); } + static void write_block_func(void* vdata) + { + auto* const data = static_cast(vdata); + auto* const webseed = data->webseed; + auto* const buf = data->content(); + + auto* const tor = tr_torrentFindFromId(data->session, data->torrent_id); + if (tor == nullptr) + { + delete data; + return; + } + + auto const len = evbuffer_get_length(buf); + TR_ASSERT(tor->blockSize(data->loc.block) == len); + tr_cacheWriteBlock(tor->session->cache, tor, data->loc, len, buf); + webseed->publishGotBlock(tor, data->loc); + TR_ASSERT(evbuffer_get_length(buf) == 0); + delete data; + } + +private: tr_session* const session; int const torrent_id; tr_webseed* const webseed; - - tr_piece_index_t piece_index; - tr_block_index_t block_index; - tr_block_index_t count; - uint32_t block_offset; + tr_block_info::Location const loc; }; -void write_block_func(void* vdata) +void useFetchedBlocks(tr_webseed_task* task) { - auto* const data = static_cast(vdata); - struct tr_webseed* const w = data->webseed; - auto* const buf = data->content(); + auto* const session = task->session; + auto const lock = session->unique_lock(); - auto* const tor = tr_torrentFindFromId(data->session, data->torrent_id); - if (tor != nullptr) + auto* const webseed = task->webseed; + auto const* const tor = webseed->getTorrent(); + if (tor == nullptr) { - uint32_t const block_size = tor->blockSize(); - uint32_t len = evbuffer_get_length(buf); - uint32_t const offset_end = data->block_offset + len; - tr_cache* cache = data->session->cache; - tr_piece_index_t const piece = data->piece_index; - - if (!tor->hasPiece(piece)) - { - while (len > 0) - { - uint32_t const bytes_this_pass = std::min(len, block_size); - tr_cacheWriteBlock(cache, tor, tor->pieceLoc(piece, offset_end - len), bytes_this_pass, buf); - len -= bytes_this_pass; - } - - fire_client_got_blocks(tor, w, data->block_index, data->count); - } + return; } - delete data; + auto* const buf = task->content(); + for (;;) + { + auto const block_size = tor->blockSize(task->loc.block); + if (evbuffer_get_length(buf) < block_size) + { + break; + } + + if (tor->hasBlock(task->loc.block)) + { + evbuffer_drain(buf, block_size); + } + else + { + auto* const data = new write_block_data{ session, tor->uniqueId, webseed, task->loc }; + evbuffer_remove_buffer(task->content(), data->content(), block_size); + tr_runInEventThread(session, write_block_data::write_block_func, data); + } + + task->loc = tor->byteLoc(task->loc.byte + block_size); + + TR_ASSERT(task->loc.byte == task->end_byte || task->loc.block_offset == 0); + } } /*** **** ***/ -void on_content_changed(evbuffer* buf, evbuffer_cb_info const* info, void* vtask) +void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtask) { size_t const n_added = info->n_added; auto* const task = static_cast(vtask); + if (n_added == 0 || task->dead) + { + return; + } + auto* const session = task->session; auto const lock = session->unique_lock(); - if (!task->dead && n_added > 0) - { - auto* const w = task->webseed; + auto* const webseed = task->webseed; + webseed->gotPieceData(n_added); - w->bandwidth.notifyBandwidthConsumed(TR_DOWN, n_added, true, tr_time_msec()); - fire_client_got_piece_data(w, n_added); - uint32_t const len = evbuffer_get_length(buf); - - task->webseed->connection_limiter.gotData(); - - if (len >= task->block_size) - { - /* once we've got at least one full block, save it */ - - uint32_t const block_size = task->block_size; - tr_block_index_t const completed = len / block_size; - - auto* const data = new write_block_data{ session, w->torrent_id, task->webseed }; - data->piece_index = task->piece_index; - data->block_index = task->block + task->blocks_done; - data->count = completed; - data->block_offset = task->piece_offset + task->blocks_done * block_size; - - /* we don't use locking on this evbuffer so we must copy out the data - that will be needed when writing the block in a different thread */ - evbuffer_remove_buffer(task->content(), data->content(), (size_t)block_size * (size_t)completed); - - tr_runInEventThread(w->session, write_block_func, data); - task->blocks_done += completed; - } - } + useFetchedBlocks(task); } void task_request_next_chunk(tr_webseed_task* task); void on_idle(tr_webseed* w) { - auto* const tor = tr_torrentFindFromId(w->session, w->torrent_id); + auto* const tor = w->getTorrent(); if (tor == nullptr || !tor->isRunning || tor->isDone()) { return; } - for (auto const span : tr_peerMgrGetNextRequests(tor, w, w->connection_limiter.slotsAvailable())) + auto const slots_available = w->connection_limiter.slotsAvailable(); + if (slots_available == 0) { + return; + } + + // Prefer to request large, contiguous chunks from webseeds. + // The actual value of '64' is arbitrary here; we could probably + // be smarter about this. + auto constexpr PreferredBlocksPerTask = size_t{ 64 }; + auto const spans = tr_peerMgrGetNextRequests(tor, w, slots_available * PreferredBlocksPerTask); + for (size_t i = 0; i < slots_available && i < std::size(spans); ++i) + { + auto const& span = spans[i]; w->connection_limiter.taskStarted(); auto* const task = new tr_webseed_task{ tor, w, span }; - evbuffer_add_cb(task->content(), on_content_changed, task); + evbuffer_add_cb(task->content(), onBufferGotData, task); w->tasks.insert(task); task_request_next_chunk(task); @@ -409,68 +422,45 @@ void onPartialDataFetched(tr_web::FetchResponse const& web_response) auto const& [status, body, did_connect, did_timeout, vtask] = web_response; bool const success = status == 206; - auto* const t = static_cast(vtask); - auto* const session = t->session; - auto* const w = t->webseed; + auto* const task = static_cast(vtask); + auto* const webseed = task->webseed; - w->connection_limiter.taskFinished(success); + webseed->connection_limiter.taskFinished(success); - if (t->dead) + if (task->dead) { - delete t; + delete task; return; } - tr_torrent* tor = tr_torrentFindFromId(session, w->torrent_id); - - if (tor != nullptr) + auto* const tor = webseed->getTorrent(); + if (tor == nullptr) { - if (!success) - { - tr_block_index_t const blocks_remain = (t->length + tor->blockSize() - 1) / tor->blockSize() - t->blocks_done; - - if (blocks_remain != 0) - { - fire_client_got_rejs(tor, w, t->block + t->blocks_done, blocks_remain); - } - - w->tasks.erase(t); - delete t; - } - else - { - uint32_t const bytes_done = t->blocks_done * tor->blockSize(); - uint32_t const buf_len = evbuffer_get_length(t->content()); - - if (bytes_done + buf_len < t->length) - { - /* request finished successfully but there's still data missing. that - means we've reached the end of a file and need to request the next one */ - task_request_next_chunk(t); - } - else - { - if (buf_len != 0 && !tor->hasPiece(t->piece_index)) - { - /* on_content_changed() will not write a block if it is smaller than - the torrent's block size, i.e. the torrent's very last block */ - tr_cacheWriteBlock( - session->cache, - tor, - tor->pieceLoc(t->piece_index, t->piece_offset + bytes_done), - buf_len, - t->content()); - - fire_client_got_blocks(tor, t->webseed, t->block + t->blocks_done, 1); - } - - w->tasks.erase(t); - delete t; - - on_idle(w); - } - } + return; } + + if (!success) + { + webseed->publishRejection({ task->loc.block, task->blocks.end }); + webseed->tasks.erase(task); + delete task; + return; + } + + if (task->loc.byte < task->end_byte) + { + // Request finished successfully but there's still data missing. + // That means we've reached the end of a file and need to request + // the next one + task_request_next_chunk(task); + return; + } + + TR_ASSERT(evbuffer_get_length(task->content()) == 0); + webseed->tasks.erase(task); + delete task; + + on_idle(webseed); } std::string make_url(tr_webseed* w, std::string_view name) @@ -485,31 +475,26 @@ std::string make_url(tr_webseed* w, std::string_view name) return url; } -void task_request_next_chunk(tr_webseed_task* t) +void task_request_next_chunk(tr_webseed_task* task) { - tr_webseed* w = t->webseed; - - tr_torrent* const tor = tr_torrentFindFromId(w->session, w->torrent_id); + auto* const webseed = task->webseed; + auto* const tor = webseed->getTorrent(); if (tor == nullptr) { return; } - auto const piece_size = tor->pieceSize(); - uint64_t const remain = t->length - t->blocks_done * tor->blockSize() - evbuffer_get_length(t->content()); + auto const [file_index, file_offset] = tor->fileOffset(task->loc); - auto const total_offset = tor->pieceLoc(t->piece_index, t->piece_offset, t->length - remain).byte; - tr_piece_index_t const step_piece = total_offset / piece_size; - uint64_t const step_piece_offset = total_offset - uint64_t(piece_size) * step_piece; + auto const left_in_file = tor->fileSize(file_index) - file_offset; + auto const left_in_task = task->end_byte - task->loc.byte; + auto const this_chunk = std::min(left_in_file, left_in_task); - auto const [file_index, file_offset] = tor->fileOffset(tor->pieceLoc(step_piece, step_piece_offset)); - uint64_t this_pass = std::min(remain, tor->fileSize(file_index) - file_offset); - - auto const url = make_url(t->webseed, tor->fileSubpath(file_index)); - auto options = tr_web::FetchOptions{ url, onPartialDataFetched, t }; - options.range = tr_strvJoin(std::to_string(file_offset), "-"sv, std::to_string(file_offset + this_pass - 1)); + auto const url = make_url(webseed, tor->fileSubpath(file_index)); + auto options = tr_web::FetchOptions{ url, onPartialDataFetched, task }; + options.range = tr_strvJoin(std::to_string(file_offset), "-"sv, std::to_string(file_offset + this_chunk - 1)); options.speed_limit_tag = tor->uniqueId; - options.buffer = t->content(); + options.buffer = task->content(); tor->session->web->fetch(std::move(options)); }