From 4461aa68d909968d6fc20b7743aa435edec14995 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Tue, 14 Feb 2023 13:50:28 -0600 Subject: [PATCH] fix: handle block fragments that arrive from peers out-of-order (#4890) --- libtransmission/cache.cc | 2 +- libtransmission/cache.h | 2 +- libtransmission/peer-msgs.cc | 106 +++++++++++++++-------------- libtransmission/torrent.cc | 11 ++- libtransmission/webseed.cc | 2 +- tests/libtransmission/move-test.cc | 2 +- 6 files changed, 68 insertions(+), 57 deletions(-) diff --git a/libtransmission/cache.cc b/libtransmission/cache.cc index ef446b244..f7ae03dcc 100644 --- a/libtransmission/cache.cc +++ b/libtransmission/cache.cc @@ -140,7 +140,7 @@ Cache::Cache(tr_torrents& torrents, int64_t max_bytes) // --- -int Cache::writeBlock(tr_torrent_id_t tor_id, tr_block_index_t block, std::unique_ptr>& writeme) +int Cache::writeBlock(tr_torrent_id_t tor_id, tr_block_index_t block, std::unique_ptr> writeme) { auto const key = Key{ tor_id, block }; auto iter = std::lower_bound(std::begin(blocks_), std::end(blocks_), key, CompareCacheBlockByKey{}); diff --git a/libtransmission/cache.h b/libtransmission/cache.h index e25deb232..9d6455610 100644 --- a/libtransmission/cache.h +++ b/libtransmission/cache.h @@ -36,7 +36,7 @@ public: } // @return any error code from cacheTrim() - int writeBlock(tr_torrent_id_t tor, tr_block_index_t block, std::unique_ptr>& writeme); + int writeBlock(tr_torrent_id_t tor, tr_block_index_t block, std::unique_ptr> writeme); int readBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len, uint8_t* setme); int prefetchBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len); diff --git a/libtransmission/peer-msgs.cc b/libtransmission/peer-msgs.cc index 837dbb4b7..24e3359d6 100644 --- a/libtransmission/peer-msgs.cc +++ b/libtransmission/peer-msgs.cc @@ -22,6 +22,7 @@ #include "transmission.h" +#include "bitfield.h" #include "cache.h" #include "completion.h" #include "crypto-utils.h" @@ -188,7 +189,20 @@ struct tr_incoming uint8_t id = 0; // the protocol message, e.g. BtPeerMsgs::Piece uint32_t length = 0; // the full message payload length. Includes the +1 for id length std::optional block_req; // metadata for incoming blocks - std::map>> block_buf; // piece data for incoming blocks + + struct incoming_piece_data + { + explicit incoming_piece_data(uint32_t block_size) + : buf{ std::make_unique>(block_size) } + , have{ block_size } + { + } + + std::unique_ptr> buf; + tr_bitfield have; + }; + + std::map blocks; }; class tr_peerMsgsImpl; @@ -1397,7 +1411,7 @@ bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len } } -int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr>& block_data, tr_block_index_t block); +int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr> block_data, tr_block_index_t block); ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_bytes_read) { @@ -1406,7 +1420,8 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b logtrace(msgs, "In readBtPiece"); // If this is the first we've seen of the piece data, parse out the header - if (!msgs->incoming.block_req) + auto& incoming = msgs->incoming; + if (!incoming.block_req) { if (inlen < 8) { @@ -1416,67 +1431,58 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b auto req = peer_request{}; msgs->io->read_uint32(&req.index); msgs->io->read_uint32(&req.offset); - req.length = msgs->incoming.length - 9; + req.length = incoming.length - 9; logtrace(msgs, fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req.index, req.offset, req.length)); - msgs->incoming.block_req = req; + incoming.block_req = req; return READ_NOW; } - auto& req = msgs->incoming.block_req; + auto& req = incoming.block_req; auto const loc = msgs->torrent->pieceLoc(req->index, req->offset); auto const block = loc.block; auto const block_size = msgs->torrent->blockSize(block); - auto& block_buf = msgs->incoming.block_buf[block]; - if (!block_buf) + + auto const n_this_pass = std::min(size_t{ req->length }, inlen); + TR_ASSERT(loc.block_offset + n_this_pass <= block_size); + if (n_this_pass == 0) { - block_buf = std::make_unique>(); - block_buf->reserve(block_size); - } - - // read in another chunk of data - auto const n_left_in_block = block_size - std::size(*block_buf); - auto const n_left_in_req = size_t{ req->length }; - auto const n_to_read = std::min({ n_left_in_block, n_left_in_req, inlen }); - auto const old_length = std::size(*block_buf); - block_buf->resize(old_length + n_to_read); - msgs->io->read_bytes(&((*block_buf)[old_length]), n_to_read); - - msgs->publish(tr_peer_event::GotPieceData(n_to_read)); - *setme_piece_bytes_read += n_to_read; - logtrace( - msgs, - fmt::format( - FMT_STRING("got {:d} bytes for block {:d}:{:d}->{:d} ... {:d} remain in req, {:d} remain in block"), - n_to_read, - req->index, - req->offset, - req->length, - req->length, - block_size - std::size(*block_buf))); - - // if we didn't read enough to finish off the request, - // update the table and wait for more - if (n_to_read < n_left_in_req) - { - auto new_loc = msgs->torrent->byteLoc(loc.byte + n_to_read); - req->index = new_loc.piece; - req->offset = new_loc.piece_offset; - req->length -= n_to_read; return READ_LATER; } - // we've fully read this message + auto& incoming_block = incoming.blocks.try_emplace(block, block_size).first->second; + msgs->io->read_bytes(std::data(*incoming_block.buf) + loc.block_offset, n_this_pass); + + msgs->publish(tr_peer_event::GotPieceData(n_this_pass)); + *setme_piece_bytes_read += n_this_pass; + incoming_block.have.setSpan(loc.block_offset, loc.block_offset + n_this_pass); + logtrace(msgs, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", n_this_pass, req->index, req->offset, req->length)); + + // if we haven't gotten the full response yet, + // update what part of `req` is unfulfilled and wait for more + if (req->length > n_this_pass) + { + req->length -= n_this_pass; + auto const new_loc = msgs->torrent->byteLoc(loc.byte + n_this_pass); + req->index = new_loc.piece; + req->offset = new_loc.piece_offset; + return READ_LATER; + } + + // we've got the entire response message req.reset(); msgs->state = AwaitingBt::Length; - // if we didn't read enough to finish off the block, - // update the table and wait for more - if (std::size(*block_buf) < block_size) + // if we haven't gotten the entire block yet, wait for more + if (!incoming_block.have.hasAll()) { return READ_LATER; } - return clientGotBlock(msgs, block_buf, block) != 0 ? READ_ERR : READ_NOW; + // we've got the entire block, so send it along. + auto block_buf = std::move(incoming_block.buf); + incoming.blocks.erase(block); // note: invalidates `incoming_block` local + auto const ok = clientGotBlock(msgs, std::move(block_buf), block) == 0; + return ok ? READ_NOW : READ_ERR; } ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) @@ -1744,7 +1750,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen) } /* returns 0 on success, or an errno on failure */ -int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr>& block_data, tr_block_index_t const block) +int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr> block_data, tr_block_index_t const block) { TR_ASSERT(msgs != nullptr); @@ -1760,7 +1766,6 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr>& if (std::size(*block_data) != msgs->torrent->blockSize(block)) { logdbg(msgs, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, std::size(*block_data))); - block_data->clear(); return EMSGSIZE; } @@ -1769,7 +1774,6 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr>& if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block)) { logdbg(msgs, "we didn't ask for this message..."); - block_data->clear(); return 0; } @@ -1777,19 +1781,17 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr>& if (msgs->torrent->hasPiece(loc.piece)) { logtrace(msgs, "we did ask for this message, but the piece is already complete..."); - block_data->clear(); return 0; } // NB: if writeBlock() fails the torrent may be paused. // If this happens, `msgs` will be a dangling pointer and must no longer be used. - if (auto const err = msgs->session->cache->writeBlock(tor->id(), block, block_data); err != 0) + if (auto const err = msgs->session->cache->writeBlock(tor->id(), block, std::move(block_data)); err != 0) { return err; } msgs->blame.set(loc.piece); - msgs->incoming.block_buf.erase(block); msgs->publish(tr_peer_event::GotBlock(tor->blockInfo(), block)); return 0; diff --git a/libtransmission/torrent.cc b/libtransmission/torrent.cc index 4b5a0b1de..cd96bac2c 100644 --- a/libtransmission/torrent.cc +++ b/libtransmission/torrent.cc @@ -2400,8 +2400,17 @@ void tr_torrentGotBlock(tr_torrent* tor, tr_block_index_t block) tor->setDirty(); tor->completion.addBlock(block); - if (auto const piece = tor->blockLoc(block).piece; tor->hasPiece(piece)) + + auto const block_loc = tor->blockLoc(block); + auto const first_piece = block_loc.piece; + auto const last_piece = tor->byteLoc(block_loc.byte + tor->blockSize(block) - 1).piece; + for (auto piece = first_piece; piece <= last_piece; ++piece) { + if (!tor->hasPiece(piece)) + { + continue; + } + if (tor->checkPiece(piece)) { onPieceCompleted(tor, piece); diff --git a/libtransmission/webseed.cc b/libtransmission/webseed.cc index 7f07dd149..8136139d8 100644 --- a/libtransmission/webseed.cc +++ b/libtransmission/webseed.cc @@ -352,7 +352,7 @@ public: { if (auto const* const tor = tr_torrentFindFromId(session_, tor_id_); tor != nullptr) { - session_->cache->writeBlock(tor_id_, block_, data_); + session_->cache->writeBlock(tor_id_, block_, std::move(data_)); webseed_->publish(tr_peer_event::GotBlock(tor->blockInfo(), block_)); } diff --git a/tests/libtransmission/move-test.cc b/tests/libtransmission/move-test.cc index cc5dc05ce..e06d1daef 100644 --- a/tests/libtransmission/move-test.cc +++ b/tests/libtransmission/move-test.cc @@ -84,7 +84,7 @@ TEST_P(IncompleteDirTest, incompleteDir) auto const test_incomplete_dir_threadfunc = [](TestIncompleteDirData* data) noexcept { - data->session->cache->writeBlock(data->tor->id(), data->block, data->buf); + data->session->cache->writeBlock(data->tor->id(), data->block, std::move(data->buf)); tr_torrentGotBlock(data->tor, data->block); data->done = true; };