fix: requests across piece boundaries when piece is not a multiple of block_size

Fixes #3324.
This commit is contained in:
Charles Kerr 2022-06-19 23:08:58 -05:00 committed by GitHub
parent b983a3ba5c
commit 85c11b7f03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 161 additions and 204 deletions

View File

@ -76,13 +76,13 @@ int Cache::writeContiguous(CIter const begin, CIter const end) const
begin, begin,
end, end,
size_t{}, size_t{},
[](size_t sum, auto const& block) { return sum + std::size(block.buf); }); [](size_t sum, auto const& block) { return sum + std::size(*block.buf); });
buf.reserve(buflen); buf.reserve(buflen);
for (auto iter = begin; iter != end; ++iter) for (auto iter = begin; iter != end; ++iter)
{ {
TR_ASSERT(begin->key.first == iter->key.first); TR_ASSERT(begin->key.first == iter->key.first);
TR_ASSERT(begin->key.second + std::distance(begin, iter) == iter->key.second); TR_ASSERT(begin->key.second + std::distance(begin, iter) == iter->key.second);
buf.insert(std::end(buf), std::begin(iter->buf), std::end(iter->buf)); buf.insert(std::end(buf), std::begin(*iter->buf), std::end(*iter->buf));
} }
TR_ASSERT(std::size(buf) == buflen); TR_ASSERT(std::size(buf) == buflen);
@ -126,14 +126,9 @@ Cache::Cache(tr_torrents& torrents, int64_t max_bytes)
**** ****
***/ ***/
int Cache::writeBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t length, struct evbuffer* writeme) void Cache::writeBlock(tr_torrent_id_t tor_id, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>>& data)
{ {
TR_ASSERT(tr_amInEventThread(torrent->session)); auto const key = Key{ tor_id, block };
TR_ASSERT(loc.block_offset == 0);
TR_ASSERT(torrent->blockSize(loc.block) == length);
TR_ASSERT(torrent->blockSize(loc.block) <= evbuffer_get_length(writeme));
auto const key = makeKey(torrent, loc);
auto iter = std::lower_bound(std::begin(blocks_), std::end(blocks_), key, CompareCacheBlockByKey{}); auto iter = std::lower_bound(std::begin(blocks_), std::end(blocks_), key, CompareCacheBlockByKey{});
if (iter == std::end(blocks_) || iter->key != key) if (iter == std::end(blocks_) || iter->key != key)
{ {
@ -143,13 +138,12 @@ int Cache::writeBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t
iter->time_added = tr_time(); iter->time_added = tr_time();
iter->buf.resize(length); iter->buf = std::move(data);
evbuffer_remove(writeme, std::data(iter->buf), std::size(iter->buf));
++cache_writes_; ++cache_writes_;
cache_write_bytes_ += length; cache_write_bytes_ += std::size(*iter->buf);
return cacheTrim(); (void)cacheTrim();
} }
Cache::CIter Cache::getBlock(tr_torrent const* torrent, tr_block_info::Location loc) noexcept Cache::CIter Cache::getBlock(tr_torrent const* torrent, tr_block_info::Location loc) noexcept
@ -171,7 +165,7 @@ int Cache::readBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t
{ {
if (auto const iter = getBlock(torrent, loc); iter != std::end(blocks_)) if (auto const iter = getBlock(torrent, loc); iter != std::end(blocks_))
{ {
std::copy_n(std::begin(iter->buf), len, setme); std::copy_n(std::begin(*iter->buf), len, setme);
return {}; return {};
} }

View File

@ -10,6 +10,7 @@
#endif #endif
#include <cstdint> // intX_t, uintX_t #include <cstdint> // intX_t, uintX_t
#include <memory> // std::unique_ptr
#include <utility> // std::pair #include <utility> // std::pair
#include <vector> #include <vector>
@ -33,7 +34,7 @@ public:
return max_bytes_; return max_bytes_;
} }
int writeBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t length, struct evbuffer* writeme); void writeBlock(tr_torrent_id_t tor, tr_block_index_t block, std::unique_ptr<std::vector<uint8_t>>& writeme);
int readBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len, uint8_t* setme); int readBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len, uint8_t* setme);
int prefetchBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len); int prefetchBlock(tr_torrent* torrent, tr_block_info::Location loc, uint32_t len);
int flushTorrent(tr_torrent* torrent); int flushTorrent(tr_torrent* torrent);
@ -45,7 +46,7 @@ private:
struct CacheBlock struct CacheBlock
{ {
Key key; Key key;
std::vector<uint8_t> buf; std::unique_ptr<std::vector<uint8_t>> buf;
time_t time_added = {}; time_t time_added = {};
}; };

View File

@ -1123,30 +1123,6 @@ void evbuffer_add_uint64(struct evbuffer* outbuf, uint64_t addme_hll)
**** ****
***/ ***/
static inline void maybeDecryptBuffer(tr_peerIo* io, struct evbuffer* buf, size_t offset, size_t size)
{
if (io->encryption_type == PEER_ENCRYPTION_RC4)
{
processBuffer(&io->crypto, buf, offset, size, &tr_cryptoDecrypt);
}
}
void tr_peerIoReadBytesToBuf(tr_peerIo* io, struct evbuffer* inbuf, struct evbuffer* outbuf, size_t byteCount)
{
TR_ASSERT(tr_isPeerIo(io));
TR_ASSERT(evbuffer_get_length(inbuf) >= byteCount);
size_t const old_length = evbuffer_get_length(outbuf);
/* append it to outbuf */
struct evbuffer* tmp = evbuffer_new();
evbuffer_remove_buffer(inbuf, tmp, byteCount);
evbuffer_add_buffer(outbuf, tmp);
evbuffer_free(tmp);
maybeDecryptBuffer(io, outbuf, old_length, byteCount);
}
void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount) void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount)
{ {
TR_ASSERT(tr_isPeerIo(io)); TR_ASSERT(tr_isPeerIo(io));

View File

@ -325,8 +325,6 @@ static inline void evbuffer_add_hton_64(struct evbuffer* buf, uint64_t val)
evbuffer_add_uint64(buf, val); evbuffer_add_uint64(buf, val);
} }
void tr_peerIoReadBytesToBuf(tr_peerIo* io, struct evbuffer* inbuf, struct evbuffer* outbuf, size_t byteCount);
void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount); void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount);
static inline void tr_peerIoReadUint8(tr_peerIo* io, struct evbuffer* inbuf, uint8_t* setme) static inline void tr_peerIoReadUint8(tr_peerIo* io, struct evbuffer* inbuf, uint8_t* setme)

View File

@ -7,6 +7,7 @@
#include <cstddef> #include <cstddef>
#include <iterator> #include <iterator>
#include <numeric> #include <numeric>
#include <set>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -136,9 +137,6 @@ std::vector<tr_block_span_t> Wishlist::next(Wishlist::Mediator const& mediator,
return {}; return {};
} }
size_t n_blocks = 0;
auto spans = std::vector<tr_block_span_t>{};
// We usually won't need all the candidates until endgame, so don't // We usually won't need all the candidates until endgame, so don't
// waste cycles sorting all of them here. partial sort is enough. // waste cycles sorting all of them here. partial sort is enough.
auto candidates = getCandidates(mediator); auto candidates = getCandidates(mediator);
@ -146,19 +144,18 @@ std::vector<tr_block_span_t> Wishlist::next(Wishlist::Mediator const& mediator,
auto const middle = std::min(std::size(candidates), MaxSortedPieces); auto const middle = std::min(std::size(candidates), MaxSortedPieces);
std::partial_sort(std::begin(candidates), std::begin(candidates) + middle, std::end(candidates)); std::partial_sort(std::begin(candidates), std::begin(candidates) + middle, std::end(candidates));
auto blocks = std::set<tr_block_index_t>{};
for (auto const& candidate : candidates) for (auto const& candidate : candidates)
{ {
// do we have enough? // do we have enough?
if (n_blocks >= n_wanted_blocks) if (std::size(blocks) >= n_wanted_blocks)
{ {
break; break;
} }
// walk the blocks in this piece // walk the blocks in this piece
auto const [begin, end] = mediator.blockSpan(candidate.piece); auto const [begin, end] = mediator.blockSpan(candidate.piece);
auto blocks = std::vector<tr_block_index_t>{}; for (tr_block_index_t block = begin; block < end && std::size(blocks) < n_wanted_blocks; ++block)
blocks.reserve(end - begin);
for (tr_block_index_t block = begin; block < end && n_blocks + std::size(blocks) < n_wanted_blocks; ++block)
{ {
// don't request blocks we've already got // don't request blocks we've already got
if (!mediator.clientCanRequestBlock(block)) if (!mediator.clientCanRequestBlock(block))
@ -173,27 +170,10 @@ std::vector<tr_block_span_t> Wishlist::next(Wishlist::Mediator const& mediator,
continue; continue;
} }
blocks.push_back(block); blocks.insert(block);
}
if (std::empty(blocks))
{
continue;
}
// copy the spans into `spans`
auto const tmp = makeSpans(std::data(blocks), std::size(blocks));
std::copy(std::begin(tmp), std::end(tmp), std::back_inserter(spans));
n_blocks += std::accumulate(
std::begin(tmp),
std::end(tmp),
size_t{},
[](size_t sum, auto span) { return sum + span.end - span.begin; });
if (n_blocks >= n_wanted_blocks)
{
break;
} }
} }
return spans; auto const blocks_v = std::vector<tr_block_index_t>{ std::begin(blocks), std::end(blocks) };
return makeSpans(std::data(blocks_v), std::size(blocks_v));
} }

View File

@ -8,6 +8,7 @@
#include <cstring> #include <cstring>
#include <ctime> #include <ctime>
#include <iterator> #include <iterator>
#include <map>
#include <memory> // std::unique_ptr #include <memory> // std::unique_ptr
#include <optional> #include <optional>
#include <vector> #include <vector>
@ -188,10 +189,10 @@ static peer_request blockToReq(tr_torrent const* tor, tr_block_index_t block)
* the current message that it's sending us. */ * the current message that it's sending us. */
struct tr_incoming struct tr_incoming
{ {
uint8_t id = 0; uint8_t id = 0; // the protocol message, e.g. BtPeerMsgs::Piece
uint32_t length = 0; /* includes the +1 for id length */ uint32_t length = 0; // the full message payload length. Includes the +1 for id length
struct peer_request blockReq = {}; /* metadata for incoming blocks */ std::optional<peer_request> block_req; // metadata for incoming blocks
struct evbuffer* block = nullptr; /* piece data for incoming blocks */ std::map<tr_block_index_t, std::unique_ptr<std::vector<uint8_t>>> block_buf; // piece data for incoming blocks
}; };
class tr_peerMsgsImpl; class tr_peerMsgsImpl;
@ -285,11 +286,6 @@ public:
set_active(TR_UP, false); set_active(TR_UP, false);
set_active(TR_DOWN, false); set_active(TR_DOWN, false);
if (this->incoming.block != nullptr)
{
evbuffer_free(this->incoming.block);
}
if (this->io != nullptr) if (this->io != nullptr)
{ {
tr_peerIoClear(this->io); tr_peerIoClear(this->io);
@ -449,13 +445,14 @@ public:
publish(e); publish(e);
} }
void publishGotBlock(struct peer_request const* req) void publishGotBlock(tr_block_index_t block)
{ {
auto const loc = torrent->blockLoc(block);
auto e = tr_peer_event{}; auto e = tr_peer_event{};
e.eventType = TR_PEER_CLIENT_GOT_BLOCK; e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
e.pieceIndex = req->index; e.pieceIndex = loc.piece;
e.offset = req->offset; e.offset = loc.piece_offset;
e.length = req->length; e.length = torrent->blockSize(block);
publish(e); publish(e);
} }
@ -546,6 +543,11 @@ public:
publish(e); publish(e);
} }
[[nodiscard]] bool isValidRequest(peer_request const& req) const
{
return tr_torrentReqIsValid(torrent, req.index, req.offset, req.length);
}
private: private:
[[nodiscard]] bool calculate_active(tr_direction direction) const [[nodiscard]] bool calculate_active(tr_direction direction) const
{ {
@ -735,8 +737,9 @@ static void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const*
static void protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req) static void protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req)
{ {
struct evbuffer* out = msgs->outMessages; TR_ASSERT(msgs->isValidRequest(req));
auto* const out = msgs->outMessages;
evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t)); evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
evbuffer_add_uint8(out, BtPeerMsgs::Request); evbuffer_add_uint8(out, BtPeerMsgs::Request);
evbuffer_add_uint32(out, req.index); evbuffer_add_uint32(out, req.index);
@ -978,20 +981,6 @@ static void cancelAllRequestsToClient(tr_peerMsgsImpl* msgs)
*** ***
**/ **/
static bool reqIsValid(tr_peerMsgsImpl const* peer, uint32_t index, uint32_t offset, uint32_t length)
{
return tr_torrentReqIsValid(peer->torrent, index, offset, length);
}
static bool requestIsValid(tr_peerMsgsImpl const* msgs, struct peer_request const* req)
{
return reqIsValid(msgs, req->index, req->offset, req->length);
}
/**
***
**/
static void sendLtepHandshake(tr_peerMsgsImpl* msgs) static void sendLtepHandshake(tr_peerMsgsImpl* msgs)
{ {
evbuffer* const out = msgs->outMessages; evbuffer* const out = msgs->outMessages;
@ -1543,7 +1532,7 @@ static bool messageLengthIsCorrect(tr_peerMsgsImpl const* msg, uint8_t id, uint3
} }
} }
static int clientGotBlock(tr_peerMsgsImpl* msgs, struct evbuffer* block, struct peer_request const* req); static int clientGotBlock(tr_peerMsgsImpl* msgs, std::unique_ptr<std::vector<uint8_t>>& block_data, tr_block_index_t block);
static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read) static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read)
{ {
@ -1551,61 +1540,82 @@ static ReadState readBtPiece(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, size
logtrace(msgs, "In readBtPiece"); logtrace(msgs, "In readBtPiece");
struct peer_request* req = &msgs->incoming.blockReq; // If this is the first we've seen of the piece data, parse out the header
if (!msgs->incoming.block_req)
if (req->length == 0)
{ {
if (inlen < 8) if (inlen < 8)
{ {
return READ_LATER; return READ_LATER;
} }
tr_peerIoReadUint32(msgs->io, inbuf, &req->index); auto req = peer_request{};
tr_peerIoReadUint32(msgs->io, inbuf, &req->offset); tr_peerIoReadUint32(msgs->io, inbuf, &req.index);
req->length = msgs->incoming.length - 9; tr_peerIoReadUint32(msgs->io, inbuf, &req.offset);
logtrace( req.length = msgs->incoming.length - 9;
msgs, logtrace(msgs, fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req.index, req.offset, req.length));
fmt::format(FMT_STRING("got incoming block header {:d}:{:d}->{:d}"), req->index, req->offset, req->length)); msgs->incoming.block_req = req;
return READ_NOW; return READ_NOW;
} }
if (msgs->incoming.block == nullptr) auto& req = msgs->incoming.block_req;
auto const loc = msgs->torrent->pieceLoc(req->index, req->offset);
auto const block = loc.block;
auto const block_size = msgs->torrent->blockSize(block);
auto& block_buf = msgs->incoming.block_buf[block];
if (!block_buf)
{ {
msgs->incoming.block = evbuffer_new(); block_buf = std::make_unique<std::vector<uint8_t>>();
block_buf->reserve(block_size);
} }
struct evbuffer* const block_buffer = msgs->incoming.block; // read in another chunk of data
auto const n_left_in_block = block_size - std::size(*block_buf);
auto const n_left_in_req = size_t{ req->length };
auto const n_to_read = std::min({ n_left_in_block, n_left_in_req, inlen });
auto const old_length = std::size(*block_buf);
block_buf->resize(old_length + n_to_read);
tr_peerIoReadBytes(msgs->io, inbuf, &((*block_buf)[old_length]), n_to_read);
/* read in another chunk of data */ msgs->publishClientGotPieceData(n_to_read);
size_t const nLeft = req->length - evbuffer_get_length(block_buffer); *setme_piece_bytes_read += n_to_read;
size_t const n = std::min(nLeft, inlen);
tr_peerIoReadBytesToBuf(msgs->io, inbuf, block_buffer, n);
msgs->publishClientGotPieceData(n);
*setme_piece_bytes_read += n;
logtrace( logtrace(
msgs, msgs,
fmt::format( fmt::format(
FMT_STRING("got {:d} bytes for block {:d}:{:d}->{:d} ... {:d} remain"), FMT_STRING("got {:d} bytes for block {:d}:{:d}->{:d} ... {:d} remain in req, {:d} remain in block"),
n, n_to_read,
req->index, req->index,
req->offset, req->offset,
req->length, req->length,
req->length - evbuffer_get_length(block_buffer))); req->length,
block_size - std::size(*block_buf)));
if (evbuffer_get_length(block_buffer) < req->length) // if we didn't read enough to finish off the request,
// update the table and wait for more
if (n_to_read < n_left_in_req)
{
auto new_loc = msgs->torrent->byteLoc(loc.byte + n_to_read);
req->index = new_loc.piece;
req->offset = new_loc.piece_offset;
req->length -= n_to_read;
return READ_LATER;
}
// we've fully read this message
req.reset();
msgs->state = AwaitingBt::Length;
// if we didn't read enough to finish off the block,
// update the table and wait for more
if (std::size(*block_buf) < block_size)
{ {
return READ_LATER; return READ_LATER;
} }
/* pass the block along... */ // pass the block along...
int const err = clientGotBlock(msgs, block_buffer, req); int const err = clientGotBlock(msgs, block_buf, block);
evbuffer_drain(block_buffer, evbuffer_get_length(block_buffer)); msgs->incoming.block_buf.erase(block);
/* cleanup */ // cleanup
req->length = 0;
msgs->state = AwaitingBt::Length;
return err != 0 ? READ_ERR : READ_NOW; return err != 0 ? READ_ERR : READ_NOW;
} }
@ -1873,57 +1883,47 @@ static ReadState readBtMessage(tr_peerMsgsImpl* msgs, struct evbuffer* inbuf, si
} }
/* returns 0 on success, or an errno on failure */ /* returns 0 on success, or an errno on failure */
static int clientGotBlock(tr_peerMsgsImpl* msgs, struct evbuffer* data, struct peer_request const* req) static int clientGotBlock(
tr_peerMsgsImpl* msgs,
std::unique_ptr<std::vector<uint8_t>>& block_data,
tr_block_index_t const block)
{ {
TR_ASSERT(msgs != nullptr); TR_ASSERT(msgs != nullptr);
TR_ASSERT(req != nullptr);
tr_torrent* const tor = msgs->torrent; tr_torrent* const tor = msgs->torrent;
auto const block = tor->pieceLoc(req->index, req->offset).block;
if (!requestIsValid(msgs, req)) if (!block_data || std::size(*block_data) != msgs->torrent->blockSize(block))
{
logdbg(msgs, fmt::format(FMT_STRING("dropping invalid block {:d}:{:d}->{:d}"), req->index, req->offset, req->length));
return EBADMSG;
}
if (req->length != msgs->torrent->blockSize(block))
{ {
logdbg( logdbg(
msgs, msgs,
fmt::format( fmt::format(
FMT_STRING("wrong block size -- expected {:d}, got {:d}"), FMT_STRING("wrong block size -- expected {:d}, got {:d}"),
msgs->torrent->blockSize(block), msgs->torrent->blockSize(block),
req->length)); block_data ? std::size(*block_data) : 0U));
block_data->clear();
return EMSGSIZE; return EMSGSIZE;
} }
logtrace(msgs, fmt::format(FMT_STRING("got block {:d}:{:d}->{:d}"), req->index, req->offset, req->length)); logtrace(msgs, fmt::format(FMT_STRING("got block {:d}"), block));
if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block)) if (!tr_peerMgrDidPeerRequest(msgs->torrent, msgs, block))
{ {
logdbg(msgs, "we didn't ask for this message..."); logdbg(msgs, "we didn't ask for this message...");
block_data->clear();
return 0; return 0;
} }
if (msgs->torrent->hasPiece(req->index)) auto const loc = msgs->torrent->blockLoc(block);
if (msgs->torrent->hasPiece(loc.piece))
{ {
logtrace(msgs, "we did ask for this message, but the piece is already complete..."); logtrace(msgs, "we did ask for this message, but the piece is already complete...");
block_data->clear();
return 0; return 0;
} }
/** msgs->session->cache->writeBlock(tor->id(), block, block_data);
*** Save the block msgs->blame.set(loc.piece);
**/ msgs->publishGotBlock(block);
if (int const err = msgs->session->cache->writeBlock(tor, tor->pieceLoc(req->index, req->offset), req->length, data);
err != 0)
{
return err;
}
msgs->blame.set(req->index);
msgs->publishGotBlock(req);
return 0; return 0;
} }
@ -2066,12 +2066,14 @@ static void updateMetadataRequests(tr_peerMsgsImpl* msgs, time_t now)
static void updateBlockRequests(tr_peerMsgsImpl* msgs) static void updateBlockRequests(tr_peerMsgsImpl* msgs)
{ {
if (!msgs->torrent->clientCanDownload()) auto* const tor = msgs->torrent;
if (!tor->clientCanDownload())
{ {
return; return;
} }
auto const n_active = tr_peerMgrCountActiveRequestsToPeer(msgs->torrent, msgs); auto const n_active = tr_peerMgrCountActiveRequestsToPeer(tor, msgs);
if (n_active >= msgs->desired_request_count) if (n_active >= msgs->desired_request_count)
{ {
return; return;
@ -2086,14 +2088,28 @@ static void updateBlockRequests(tr_peerMsgsImpl* msgs)
TR_ASSERT(msgs->is_client_interested()); TR_ASSERT(msgs->is_client_interested());
TR_ASSERT(!msgs->is_client_choked()); TR_ASSERT(!msgs->is_client_choked());
for (auto const span : tr_peerMgrGetNextRequests(msgs->torrent, msgs, n_wanted)) for (auto const span : tr_peerMgrGetNextRequests(tor, msgs, n_wanted))
{ {
for (tr_block_index_t block = span.begin; block < span.end; ++block) for (tr_block_index_t block = span.begin; block < span.end; ++block)
{ {
protocolSendRequest(msgs, blockToReq(msgs->torrent, block)); // Note that requests can't cross over a piece boundary.
// So if a piece isn't evenly divisible by the block size,
// we need to split our block request info per-piece chunks.
auto const byte_begin = tor->blockLoc(block).byte;
auto const block_size = tor->blockSize(block);
auto const byte_end = byte_begin + block_size;
for (auto offset = byte_begin; offset < byte_end;)
{
auto const loc = tor->byteLoc(offset);
auto const left_in_block = block_size - loc.block_offset;
auto const left_in_piece = tor->pieceSize(loc.piece) - loc.piece_offset;
auto const req_len = std::min(left_in_block, left_in_piece);
protocolSendRequest(msgs, { loc.piece, loc.piece_offset, req_len });
offset += req_len;
}
} }
tr_peerMgrClientSentRequests(msgs->torrent, msgs, span); tr_peerMgrClientSentRequests(tor, msgs, span);
} }
} }
@ -2199,7 +2215,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
req = msgs->peer_requested_.front(); req = msgs->peer_requested_.front();
msgs->peer_requested_.erase(std::begin(msgs->peer_requested_)); msgs->peer_requested_.erase(std::begin(msgs->peer_requested_));
if (requestIsValid(msgs, &req) && msgs->torrent->hasPiece(req.index)) if (msgs->isValidRequest(req) && msgs->torrent->hasPiece(req.index))
{ {
uint32_t const msglen = 4 + 1 + 4 + 4 + req.length; uint32_t const msglen = 4 + 1 + 4 + 4 + req.length;
struct evbuffer_iovec iovec[1]; struct evbuffer_iovec iovec[1];

View File

@ -279,6 +279,11 @@ public:
return fpm_.fileOffset(loc.byte); return fpm_.fileOffset(loc.byte);
} }
[[nodiscard]] auto byteSpan(tr_file_index_t file) const
{
return fpm_.byteSpan(file);
}
/// WANTED /// WANTED
[[nodiscard]] bool pieceIsWanted(tr_piece_index_t piece) const final [[nodiscard]] bool pieceIsWanted(tr_piece_index_t piece) const final

View File

@ -5,6 +5,7 @@
#include <algorithm> #include <algorithm>
#include <iterator> #include <iterator>
#include <memory>
#include <set> #include <set>
#include <string> #include <string>
#include <string_view> #include <string_view>
@ -229,11 +230,11 @@ public:
} }
} }
void publishGotBlock(tr_torrent const* tor, tr_block_info::Location loc) void publishGotBlock(tr_torrent const* tor, tr_block_index_t block)
{ {
TR_ASSERT(loc.block_offset == 0); TR_ASSERT(block < tor->blockCount());
TR_ASSERT(loc.block < tor->blockCount());
auto const loc = tor->blockLoc(block);
auto e = tr_peer_event{}; auto e = tr_peer_event{};
e.eventType = TR_PEER_CLIENT_GOT_BLOCK; e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
e.pieceIndex = loc.piece; e.pieceIndex = loc.piece;
@ -295,44 +296,36 @@ private:
public: public:
write_block_data( write_block_data(
tr_session* session_in, tr_session* session,
tr_torrent_id_t torrent_id_in, tr_torrent_id_t tor_id,
tr_webseed* webseed_in, tr_block_index_t block,
tr_block_info::Location loc_in) std::unique_ptr<std::vector<uint8_t>>& data,
: session{ session_in } tr_webseed* webseed)
, torrent_id{ torrent_id_in } : session_{ session }
, webseed{ webseed_in } , tor_id_{ tor_id }
, loc{ loc_in } , block_{ block }
, data_{ std::move(data) }
, webseed_{ webseed }
{ {
TR_ASSERT(loc.block_offset == 0);
}
[[nodiscard]] auto* content() const
{
return content_.get();
} }
void write_block_func() void write_block_func()
{ {
auto* const buf = this->content(); if (auto* const tor = tr_torrentFindFromId(session_, tor_id_); tor != nullptr)
if (auto* const tor = tr_torrentFindFromId(this->session, this->torrent_id); tor != nullptr)
{ {
auto const len = evbuffer_get_length(buf); session_->cache->writeBlock(tor_id_, block_, data_);
TR_ASSERT(tor->blockSize(this->loc.block) == len); webseed_->publishGotBlock(tor, block_);
tor->session->cache->writeBlock(tor, this->loc, len, buf);
webseed->publishGotBlock(tor, this->loc);
TR_ASSERT(evbuffer_get_length(buf) == 0);
} }
delete this; delete this;
} }
private: private:
tr_session* const session; tr_session* const session_;
tr_torrent_id_t const torrent_id; tr_torrent_id_t const tor_id_;
tr_webseed* const webseed; tr_block_index_t const block_;
tr_block_info::Location const loc; std::unique_ptr<std::vector<uint8_t>> data_;
tr_webseed* const webseed_;
}; };
void useFetchedBlocks(tr_webseed_task* task) void useFetchedBlocks(tr_webseed_task* task)
@ -362,8 +355,10 @@ void useFetchedBlocks(tr_webseed_task* task)
} }
else else
{ {
auto* const data = new write_block_data{ session, tor->id(), webseed, task->loc }; auto block_buf = std::make_unique<std::vector<uint8_t>>();
evbuffer_remove_buffer(task->content(), data->content(), block_size); block_buf->resize(block_size);
evbuffer_remove(task->content(), std::data(*block_buf), std::size(*block_buf));
auto* const data = new write_block_data{ session, tor->id(), task->loc.block, block_buf, webseed };
tr_runInEventThread(session, &write_block_data::write_block_func, data); tr_runInEventThread(session, &write_block_data::write_block_func, data);
} }

View File

@ -4,6 +4,7 @@
// License text can be found in the licenses/ folder. // License text can be found in the licenses/ folder.
#include <iostream> #include <iostream>
#include <memory>
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <utility> #include <utility>
@ -81,36 +82,30 @@ TEST_P(IncompleteDirTest, incompleteDir)
tr_torrent* tor = {}; tr_torrent* tor = {};
tr_block_index_t block = {}; tr_block_index_t block = {};
tr_piece_index_t pieceIndex = {}; tr_piece_index_t pieceIndex = {};
uint32_t offset = {}; std::unique_ptr<std::vector<uint8_t>> buf = {};
struct evbuffer* buf = {};
bool done = {}; bool done = {};
}; };
auto const test_incomplete_dir_threadfunc = [](void* vdata) noexcept auto const test_incomplete_dir_threadfunc = [](TestIncompleteDirData* data) noexcept
{ {
auto* data = static_cast<TestIncompleteDirData*>(vdata); data->session->cache->writeBlock(data->tor->id(), data->block, data->buf);
data->session->cache->writeBlock(data->tor, data->tor->pieceLoc(0, data->offset), tr_block_info::BlockSize, data->buf);
tr_torrentGotBlock(data->tor, data->block); tr_torrentGotBlock(data->tor, data->block);
data->done = true; data->done = true;
}; };
// now finish writing it // now finish writing it
{ {
auto* const zero_block = tr_new0(char, tr_block_info::BlockSize); auto data = TestIncompleteDirData{};
struct TestIncompleteDirData data = {};
data.session = session_; data.session = session_;
data.tor = tor; data.tor = tor;
data.buf = evbuffer_new();
auto const [begin, end] = tor->blockSpanForPiece(data.pieceIndex); auto const [begin, end] = tor->blockSpanForPiece(data.pieceIndex);
for (tr_block_index_t block_index = begin; block_index < end; ++block_index) for (tr_block_index_t block_index = begin; block_index < end; ++block_index)
{ {
evbuffer_add(data.buf, zero_block, tr_block_info::BlockSize); data.buf = std::make_unique<std::vector<uint8_t>>(tr_block_info::BlockSize, '\0');
data.block = block_index; data.block = block_index;
data.done = false; data.done = false;
data.offset = data.block * tr_block_info::BlockSize;
tr_runInEventThread(session_, test_incomplete_dir_threadfunc, &data); tr_runInEventThread(session_, test_incomplete_dir_threadfunc, &data);
auto const test = [&data]() auto const test = [&data]()
@ -119,9 +114,6 @@ TEST_P(IncompleteDirTest, incompleteDir)
}; };
EXPECT_TRUE(waitFor(test, MaxWaitMsec)); EXPECT_TRUE(waitFor(test, MaxWaitMsec));
} }
evbuffer_free(data.buf);
tr_free(zero_block);
} }
blockingTorrentVerify(tor); blockingTorrentVerify(tor);