mirror of
https://github.com/transmission/transmission
synced 2025-01-30 10:52:00 +00:00
fix: handle block fragments that arrive from peers out-of-order (#4890)
This commit is contained in:
parent
9d91d1e969
commit
4461aa68d9
6 changed files with 68 additions and 57 deletions
|
@ -140,7 +140,7 @@ Cache::Cache(tr_torrents& torrents, int64_t max_bytes)
|
|||
|
||||
// ---
|
||||
|
||||
int Cache::writeBlock(tr_torrent_id_t tor_id, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>>& writeme)
|
||||
int Cache::writeBlock(tr_torrent_id_t tor_id, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>> writeme)
|
||||
{
|
||||
auto const key = Key{ tor_id, block };
|
||||
auto iter = std::lower_bound(std::begin(blocks_), std::end(blocks_), key, CompareCacheBlockByKey{});
|
||||
|
|
|
@ -36,7 +36,7 @@ public:
|
|||
}
|
||||
|
||||
// @return any error code from cacheTrim()
|
||||
int writeBlock(tr_torrent_id_t tor, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>>& writeme);
|
||||
int writeBlock(tr_torrent_id_t tor, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>> writeme);
|
||||
|
||||
int readBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len, uint8_t* setme);
|
||||
int prefetchBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len);
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
#include "transmission.h"
|
||||
|
||||
#include "bitfield.h"
|
||||
#include "cache.h"
|
||||
#include "completion.h"
|
||||
#include "crypto-utils.h"
|
||||
|
@ -188,7 +189,20 @@ struct tr_incoming
|
|||
uint8_t id = 0; // the protocol message, e.g. BtPeerMsgs::Piece
|
||||
uint32_t length = 0; // the full message payload length. Includes the +1 for id length
|
||||
std::optional<peer_request> block_req; // metadata for incoming blocks
|
||||
std::map<tr_block_index_t, std::unique_ptr<std::vector<uint8_t>>> block_buf; // piece data for incoming blocks
|
||||
|
||||
struct incoming_piece_data
|
||||
{
|
||||
explicit incoming_piece_data(uint32_t block_size)
|
||||
: buf{ std::make_unique<std::vector<uint8_t>>(block_size) }
|
||||
, have{ block_size }
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<std::vector<uint8_t>> buf;
|
||||
tr_bitfield have;
|
||||
};
|
||||
|
||||
std::map<tr_block_index_t, incoming_piece_data> blocks;
|
||||
};
|
||||
|
||||
class tr_peerMsgsImpl;
|
||||
|
@ -1397,7 +1411,7 @@ bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint32_t len
|
|||
}
|
||||
}
|
||||
|
||||
int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>& block_data, tr_block_index_t block);
|
||||
int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>> block_data, tr_block_index_t block);
|
||||
|
||||
ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_bytes_read)
|
||||
{
|
||||
|
@ -1406,7 +1420,8 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b
|
|||
logtrace(msgs, "In readBtPiece");
|
||||
|
||||
// If this is the first we've seen of the piece data, parse out the header
|
||||
if (!msgs->incoming.block_req)
|
||||
auto& incoming = msgs->incoming;
|
||||
if (!incoming.block_req)
|
||||
{
|
||||
if (inlen < 8)
|
||||
{
|
||||
|
@ -1416,67 +1431,58 @@ ReadState readBtPiece(tr_peerMsgsImpl* msgs, size_t inlen, size_t* setme_piece_b
|
|||
auto req = peer_request{};
|
||||
msgs->io->read_uint32(&req.index);
|
||||
msgs->io->read_uint32(&req.offset);
|
||||
req.length = msgs->incoming.length - 9;
|
||||
req.length = incoming.length - 9;
|
||||
logtrace(msgs, fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req.index, req.offset, req.length));
|
||||
msgs->incoming.block_req = req;
|
||||
incoming.block_req = req;
|
||||
return READ_NOW;
|
||||
}
|
||||
|
||||
auto& req = msgs->incoming.block_req;
|
||||
auto& req = incoming.block_req;
|
||||
auto const loc = msgs->torrent->pieceLoc(req->index, req->offset);
|
||||
auto const block = loc.block;
|
||||
auto const block_size = msgs->torrent->blockSize(block);
|
||||
auto& block_buf = msgs->incoming.block_buf[block];
|
||||
if (!block_buf)
|
||||
|
||||
auto const n_this_pass = std::min(size_t{ req->length }, inlen);
|
||||
TR_ASSERT(loc.block_offset + n_this_pass <= block_size);
|
||||
if (n_this_pass == 0)
|
||||
{
|
||||
block_buf = std::make_unique<std::vector<uint8_t>>();
|
||||
block_buf->reserve(block_size);
|
||||
}
|
||||
|
||||
// read in another chunk of data
|
||||
auto const n_left_in_block = block_size - std::size(*block_buf);
|
||||
auto const n_left_in_req = size_t{ req->length };
|
||||
auto const n_to_read = std::min({ n_left_in_block, n_left_in_req, inlen });
|
||||
auto const old_length = std::size(*block_buf);
|
||||
block_buf->resize(old_length + n_to_read);
|
||||
msgs->io->read_bytes(&((*block_buf)[old_length]), n_to_read);
|
||||
|
||||
msgs->publish(tr_peer_event::GotPieceData(n_to_read));
|
||||
*setme_piece_bytes_read += n_to_read;
|
||||
logtrace(
|
||||
msgs,
|
||||
fmt::format(
|
||||
FMT_STRING("got {:d} bytes for block {:d}:{:d}->{:d} ... {:d} remain in req, {:d} remain in block"),
|
||||
n_to_read,
|
||||
req->index,
|
||||
req->offset,
|
||||
req->length,
|
||||
req->length,
|
||||
block_size - std::size(*block_buf)));
|
||||
|
||||
// if we didn't read enough to finish off the request,
|
||||
// update the table and wait for more
|
||||
if (n_to_read < n_left_in_req)
|
||||
{
|
||||
auto new_loc = msgs->torrent->byteLoc(loc.byte + n_to_read);
|
||||
req->index = new_loc.piece;
|
||||
req->offset = new_loc.piece_offset;
|
||||
req->length -= n_to_read;
|
||||
return READ_LATER;
|
||||
}
|
||||
|
||||
// we've fully read this message
|
||||
auto& incoming_block = incoming.blocks.try_emplace(block, block_size).first->second;
|
||||
msgs->io->read_bytes(std::data(*incoming_block.buf) + loc.block_offset, n_this_pass);
|
||||
|
||||
msgs->publish(tr_peer_event::GotPieceData(n_this_pass));
|
||||
*setme_piece_bytes_read += n_this_pass;
|
||||
incoming_block.have.setSpan(loc.block_offset, loc.block_offset + n_this_pass);
|
||||
logtrace(msgs, fmt::format("got {:d} bytes for req {:d}:{:d}->{:d}", n_this_pass, req->index, req->offset, req->length));
|
||||
|
||||
// if we haven't gotten the full response yet,
|
||||
// update what part of `req` is unfulfilled and wait for more
|
||||
if (req->length > n_this_pass)
|
||||
{
|
||||
req->length -= n_this_pass;
|
||||
auto const new_loc = msgs->torrent->byteLoc(loc.byte + n_this_pass);
|
||||
req->index = new_loc.piece;
|
||||
req->offset = new_loc.piece_offset;
|
||||
return READ_LATER;
|
||||
}
|
||||
|
||||
// we've got the entire response message
|
||||
req.reset();
|
||||
msgs->state = AwaitingBt::Length;
|
||||
|
||||
// if we didn't read enough to finish off the block,
|
||||
// update the table and wait for more
|
||||
if (std::size(*block_buf) < block_size)
|
||||
// if we haven't gotten the entire block yet, wait for more
|
||||
if (!incoming_block.have.hasAll())
|
||||
{
|
||||
return READ_LATER;
|
||||
}
|
||||
|
||||
return clientGotBlock(msgs, block_buf, block) != 0 ? READ_ERR : READ_NOW;
|
||||
// we've got the entire block, so send it along.
|
||||
auto block_buf = std::move(incoming_block.buf);
|
||||
incoming.blocks.erase(block); // note: invalidates `incoming_block` local
|
||||
auto const ok = clientGotBlock(msgs, std::move(block_buf), block) == 0;
|
||||
return ok ? READ_NOW : READ_ERR;
|
||||
}
|
||||
|
||||
ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen)
|
||||
|
@ -1744,7 +1750,7 @@ ReadState readBtMessage(tr_peerMsgsImpl* msgs, size_t inlen)
|
|||
}
|
||||
|
||||
/* returns 0 on success, or an errno on failure */
|
||||
int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>& block_data, tr_block_index_t const block)
|
||||
int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>> block_data, tr_block_index_t const block)
|
||||
{
|
||||
TR_ASSERT(msgs != nullptr);
|
||||
|
||||
|
@ -1760,7 +1766,6 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>&
|
|||
if (std::size(*block_data) != msgs->torrent->blockSize(block))
|
||||
{
|
||||
logdbg(msgs, fmt::format("wrong block size: expected {:d}, got {:d}", n_expected, std::size(*block_data)));
|
||||
block_data->clear();
|
||||
return EMSGSIZE;
|
||||
}
|
||||
|
||||
|
@ -1769,7 +1774,6 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>&
|
|||
if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block))
|
||||
{
|
||||
logdbg(msgs, "we didn't ask for this message...");
|
||||
block_data->clear();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1777,19 +1781,17 @@ int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>&
|
|||
if (msgs->torrent->hasPiece(loc.piece))
|
||||
{
|
||||
logtrace(msgs, "we did ask for this message, but the piece is already complete...");
|
||||
block_data->clear();
|
||||
return 0;
|
||||
}
|
||||
|
||||
// NB: if writeBlock() fails the torrent may be paused.
|
||||
// If this happens, `msgs` will be a dangling pointer and must no longer be used.
|
||||
if (auto const err = msgs->session->cache->writeBlock(tor->id(), block, block_data); err != 0)
|
||||
if (auto const err = msgs->session->cache->writeBlock(tor->id(), block, std::move(block_data)); err != 0)
|
||||
{
|
||||
return err;
|
||||
}
|
||||
|
||||
msgs->blame.set(loc.piece);
|
||||
msgs->incoming.block_buf.erase(block);
|
||||
msgs->publish(tr_peer_event::GotBlock(tor->blockInfo(), block));
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -2400,8 +2400,17 @@ void tr_torrentGotBlock(tr_torrent* tor, tr_block_index_t block)
|
|||
tor->setDirty();
|
||||
|
||||
tor->completion.addBlock(block);
|
||||
if (auto const piece = tor->blockLoc(block).piece; tor->hasPiece(piece))
|
||||
|
||||
auto const block_loc = tor->blockLoc(block);
|
||||
auto const first_piece = block_loc.piece;
|
||||
auto const last_piece = tor->byteLoc(block_loc.byte + tor->blockSize(block) - 1).piece;
|
||||
for (auto piece = first_piece; piece <= last_piece; ++piece)
|
||||
{
|
||||
if (!tor->hasPiece(piece))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tor->checkPiece(piece))
|
||||
{
|
||||
onPieceCompleted(tor, piece);
|
||||
|
|
|
@ -352,7 +352,7 @@ public:
|
|||
{
|
||||
if (auto const* const tor = tr_torrentFindFromId(session_, tor_id_); tor != nullptr)
|
||||
{
|
||||
session_->cache->writeBlock(tor_id_, block_, data_);
|
||||
session_->cache->writeBlock(tor_id_, block_, std::move(data_));
|
||||
webseed_->publish(tr_peer_event::GotBlock(tor->blockInfo(), block_));
|
||||
}
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ TEST_P(IncompleteDirTest, incompleteDir)
|
|||
|
||||
auto const test_incomplete_dir_threadfunc = [](TestIncompleteDirData* data) noexcept
|
||||
{
|
||||
data->session->cache->writeBlock(data->tor->id(), data->block, data->buf);
|
||||
data->session->cache->writeBlock(data->tor->id(), data->block, std::move(data->buf));
|
||||
tr_torrentGotBlock(data->tor, data->block);
|
||||
data->done = true;
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue