refactor: add tr_peer.requestBlocks() (#3384)

This commit is contained in:
Charles Kerr 2022-07-02 14:42:16 -05:00 committed by GitHub
parent 5e43731bcf
commit e7627dc209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 144 additions and 116 deletions

View File

@ -87,6 +87,8 @@ public:
// requests that have been made but haven't been fulfilled yet
[[nodiscard]] virtual size_t activeReqCount(tr_direction) const noexcept = 0;
virtual void requestBlocks(tr_block_span_t const* block_spans, size_t n_spans) = 0;
tr_session* const session;
tr_swarm* const swarm;

View File

@ -223,6 +223,23 @@ struct EventDeleter
using UniqueTimer = std::unique_ptr<struct event, EventDeleter>;
#define myLogMacro(msgs, level, text) \
do \
{ \
if (tr_logGetLevel() >= (level)) \
{ \
tr_logAddMessage( \
__FILE__, \
__LINE__, \
(level), \
fmt::format(FMT_STRING("{:s} [{:s}]: {:s}"), (msgs)->io->addrStr(), (msgs)->client, text), \
(msgs)->torrent->name()); \
} \
} while (0)
#define logdbg(msgs, text) myLogMacro(msgs, TR_LOG_DEBUG, text)
#define logtrace(msgs, text) myLogMacro(msgs, TR_LOG_TRACE, text)
/**
* Low-level communication state information about a connected peer.
*
@ -296,6 +313,20 @@ public:
evbuffer_free(this->outMessages);
}
void dbgOutMessageLen() const
{
logtrace(this, fmt::format(FMT_STRING("outMessage size is now {:d}"), evbuffer_get_length(outMessages)));
}
void pokeBatchPeriod(int interval)
{
if (outMessagesBatchPeriod > interval)
{
outMessagesBatchPeriod = interval;
logtrace(this, fmt::format(FMT_STRING("lowering batch interval to {:d} seconds"), interval));
}
}
bool isTransferringPieces(uint64_t now, tr_direction direction, unsigned int* setme_Bps) const override
{
auto const Bps = io->getPieceSpeed_Bps(now, direction);
@ -596,7 +627,54 @@ public:
return tr_torrentReqIsValid(torrent, req.index, req.offset, req.length);
}
void requestBlocks(tr_block_span_t const* block_spans, size_t n_spans) override
{
TR_ASSERT(torrent->clientCanDownload());
TR_ASSERT(is_client_interested());
TR_ASSERT(!is_client_choked());
for (auto const *span = block_spans, *span_end = span + n_spans; span != span_end; ++span)
{
for (auto [block, block_end] = *span; block < block_end; ++block)
{
// Note that requests can't cross over a piece boundary.
// So if a piece isn't evenly divisible by the block size,
// we need to split our block request info per-piece chunks.
auto const byte_begin = torrent->blockLoc(block).byte;
auto const block_size = torrent->blockSize(block);
auto const byte_end = byte_begin + block_size;
for (auto offset = byte_begin; offset < byte_end;)
{
auto const loc = torrent->byteLoc(offset);
auto const left_in_block = block_size - loc.block_offset;
auto const left_in_piece = torrent->pieceSize(loc.piece) - loc.piece_offset;
auto const req_len = std::min(left_in_block, left_in_piece);
protocolSendRequest({ loc.piece, loc.piece_offset, req_len });
offset += req_len;
}
}
tr_peerMgrClientSentRequests(torrent, this, *span);
}
}
private:
void protocolSendRequest(struct peer_request const& req)
{
TR_ASSERT(isValidRequest(req));
auto* const out = outMessages;
evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
evbuffer_add_uint8(out, BtPeerMsgs::Request);
evbuffer_add_uint32(out, req.index);
evbuffer_add_uint32(out, req.offset);
evbuffer_add_uint32(out, req.length);
logtrace(this, fmt::format(FMT_STRING("requesting {:d}:{:d}->{:d}..."), req.index, req.offset, req.length));
dbgOutMessageLen();
pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
[[nodiscard]] float calculatePercentDone() const noexcept
{
if (have_.hasAll())
@ -754,41 +832,6 @@ tr_peerMsgs* tr_peerMsgsNew(tr_torrent* torrent, peer_atom* atom, tr_peerIo* io,
***
**/
#define myLogMacro(msgs, level, text) \
do \
{ \
if (tr_logGetLevel() >= (level)) \
{ \
tr_logAddMessage( \
__FILE__, \
__LINE__, \
(level), \
fmt::format(FMT_STRING("{:s} [{:s}]: {:s}"), (msgs)->io->addrStr(), (msgs)->client, text), \
(msgs)->torrent->name()); \
} \
} while (0)
#define logdbg(msgs, text) myLogMacro(msgs, TR_LOG_DEBUG, text)
#define logtrace(msgs, text) myLogMacro(msgs, TR_LOG_TRACE, text)
/**
***j
**/
static void pokeBatchPeriod(tr_peerMsgsImpl* msgs, int interval)
{
if (msgs->outMessagesBatchPeriod > interval)
{
msgs->outMessagesBatchPeriod = interval;
logtrace(msgs, fmt::format(FMT_STRING("lowering batch interval to {:d} seconds"), interval));
}
}
static void dbgOutMessageLen(tr_peerMsgsImpl* msgs)
{
logtrace(msgs, fmt::format(FMT_STRING("outMessage size is now {:d}"), evbuffer_get_length(msgs->outMessages)));
}
static void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const* req)
{
TR_ASSERT(msgs->io->supportsFEXT());
@ -802,23 +845,7 @@ static void protocolSendReject(tr_peerMsgsImpl* msgs, struct peer_request const*
evbuffer_add_uint32(out, req->length);
logtrace(msgs, fmt::format(FMT_STRING("rejecting {:d}:{:d}->{:d}..."), req->index, req->offset, req->length));
dbgOutMessageLen(msgs);
}
static void protocolSendRequest(tr_peerMsgsImpl* msgs, struct peer_request const& req)
{
TR_ASSERT(msgs->isValidRequest(req));
auto* const out = msgs->outMessages;
evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
evbuffer_add_uint8(out, BtPeerMsgs::Request);
evbuffer_add_uint32(out, req.index);
evbuffer_add_uint32(out, req.offset);
evbuffer_add_uint32(out, req.length);
logtrace(msgs, fmt::format(FMT_STRING("requesting {:d}:{:d}->{:d}..."), req.index, req.offset, req.length));
dbgOutMessageLen(msgs);
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
msgs->dbgOutMessageLen();
}
static void protocolSendCancel(tr_peerMsgsImpl* msgs, peer_request const& req)
@ -832,8 +859,8 @@ static void protocolSendCancel(tr_peerMsgsImpl* msgs, peer_request const& req)
evbuffer_add_uint32(out, req.length);
logtrace(msgs, fmt::format(FMT_STRING("cancelling {:d}:{:d}->{:d}..."), req.index, req.offset, req.length));
dbgOutMessageLen(msgs);
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
static void protocolSendPort(tr_peerMsgsImpl* msgs, tr_port port)
@ -855,8 +882,8 @@ static void protocolSendHave(tr_peerMsgsImpl* msgs, tr_piece_index_t index)
evbuffer_add_uint32(out, index);
logtrace(msgs, fmt::format(FMT_STRING("sending Have {:d}"), index));
dbgOutMessageLen(msgs);
pokeBatchPeriod(msgs, LowPriorityIntervalSecs);
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(LowPriorityIntervalSecs);
}
#if 0
@ -873,7 +900,7 @@ static void protocolSendAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
evbuffer_add_uint32(io, out, pieceIndex);
logtrace(msgs, "sending Allowed Fast %u...", pieceIndex);
dbgOutMessageLen(msgs);
msgs->dbgOutMessageLen();
}
#endif
@ -886,8 +913,8 @@ static void protocolSendChoke(tr_peerMsgsImpl* msgs, bool choke)
evbuffer_add_uint8(out, choke ? BtPeerMsgs::Choke : BtPeerMsgs::Unchoke);
logtrace(msgs, choke ? "sending choke" : "sending unchoked");
dbgOutMessageLen(msgs);
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
static void protocolSendHaveAll(tr_peerMsgsImpl* msgs)
@ -900,8 +927,8 @@ static void protocolSendHaveAll(tr_peerMsgsImpl* msgs)
evbuffer_add_uint8(out, BtPeerMsgs::FextHaveAll);
logtrace(msgs, "sending HAVE_ALL...");
dbgOutMessageLen(msgs);
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
static void protocolSendHaveNone(tr_peerMsgsImpl* msgs)
@ -914,8 +941,8 @@ static void protocolSendHaveNone(tr_peerMsgsImpl* msgs)
evbuffer_add_uint8(out, BtPeerMsgs::FextHaveNone);
logtrace(msgs, "sending HAVE_NONE...");
dbgOutMessageLen(msgs);
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
msgs->dbgOutMessageLen();
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
/**
@ -1015,8 +1042,8 @@ static void sendInterest(tr_peerMsgsImpl* msgs, bool b)
evbuffer_add_uint32(out, sizeof(uint8_t));
evbuffer_add_uint8(out, b ? BtPeerMsgs::Interested : BtPeerMsgs::NotInterested);
pokeBatchPeriod(msgs, HighPriorityIntervalSecs);
dbgOutMessageLen(msgs);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
}
static bool popNextMetadataRequest(tr_peerMsgsImpl* msgs, int* piece)
@ -1154,8 +1181,8 @@ static void sendLtepHandshake(tr_peerMsgsImpl* msgs)
evbuffer_add_uint8(out, BtPeerMsgs::Ltep);
evbuffer_add_uint8(out, LtepMessages::Handshake);
evbuffer_add_buffer(out, payload);
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
dbgOutMessageLen(msgs);
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
msgs->dbgOutMessageLen();
/* cleanup */
evbuffer_free(payload);
@ -1327,8 +1354,8 @@ static void parseUtMetadata(tr_peerMsgsImpl* msgs, uint32_t msglen, struct evbuf
evbuffer_add_uint8(out, BtPeerMsgs::Ltep);
evbuffer_add_uint8(out, msgs->ut_metadata_id);
evbuffer_add_buffer(out, payload);
pokeBatchPeriod(msgs, HighPriorityIntervalSecs);
dbgOutMessageLen(msgs);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
/* cleanup */
evbuffer_free(payload);
@ -2118,8 +2145,8 @@ static void updateMetadataRequests(tr_peerMsgsImpl* msgs, time_t now)
evbuffer_add_uint8(out, BtPeerMsgs::Ltep);
evbuffer_add_uint8(out, msgs->ut_metadata_id);
evbuffer_add_buffer(out, payload);
pokeBatchPeriod(msgs, HighPriorityIntervalSecs);
dbgOutMessageLen(msgs);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
/* cleanup */
evbuffer_free(payload);
@ -2151,28 +2178,9 @@ static void updateBlockRequests(tr_peerMsgsImpl* msgs)
TR_ASSERT(msgs->is_client_interested());
TR_ASSERT(!msgs->is_client_choked());
for (auto const span : tr_peerMgrGetNextRequests(tor, msgs, n_wanted))
if (auto const requests = tr_peerMgrGetNextRequests(tor, msgs, n_wanted); !std::empty(requests))
{
for (tr_block_index_t block = span.begin; block < span.end; ++block)
{
// Note that requests can't cross over a piece boundary.
// So if a piece isn't evenly divisible by the block size,
// we need to split our block request info per-piece chunks.
auto const byte_begin = tor->blockLoc(block).byte;
auto const block_size = tor->blockSize(block);
auto const byte_end = byte_begin + block_size;
for (auto offset = byte_begin; offset < byte_end;)
{
auto const loc = tor->byteLoc(offset);
auto const left_in_block = block_size - loc.block_offset;
auto const left_in_piece = tor->pieceSize(loc.piece) - loc.piece_offset;
auto const req_len = std::min(left_in_block, left_in_piece);
protocolSendRequest(msgs, { loc.piece, loc.piece_offset, req_len });
offset += req_len;
}
}
tr_peerMgrClientSentRequests(tor, msgs, span);
msgs->requestBlocks(std::data(requests), std::size(requests));
}
}
@ -2235,8 +2243,8 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
evbuffer_add_uint8(out, msgs->ut_metadata_id);
evbuffer_add_buffer(out, payload);
evbuffer_add(out, data, dataLen);
pokeBatchPeriod(msgs, HighPriorityIntervalSecs);
dbgOutMessageLen(msgs);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
evbuffer_free(payload);
tr_variantFree(&tmp);
@ -2261,8 +2269,8 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
evbuffer_add_uint8(out, BtPeerMsgs::Ltep);
evbuffer_add_uint8(out, msgs->ut_metadata_id);
evbuffer_add_buffer(out, payload);
pokeBatchPeriod(msgs, HighPriorityIntervalSecs);
dbgOutMessageLen(msgs);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
msgs->dbgOutMessageLen();
evbuffer_free(payload);
tr_variantFree(&tmp);
@ -2356,7 +2364,7 @@ static size_t fillOutputBuffer(tr_peerMsgsImpl* msgs, time_t now)
{
logtrace(msgs, "sending a keepalive message");
evbuffer_add_uint32(msgs->outMessages, 0);
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
return bytesWritten;
@ -2413,7 +2421,7 @@ static void sendBitfield(tr_peerMsgsImpl* msgs)
evbuffer_add_uint8(out, BtPeerMsgs::Bitfield);
evbuffer_add(out, bytes.data(), std::size(bytes));
logtrace(msgs, fmt::format(FMT_STRING("sending bitfield... outMessage size is now {:d}"), evbuffer_get_length(out)));
pokeBatchPeriod(msgs, ImmediatePriorityIntervalSecs);
msgs->pokeBatchPeriod(ImmediatePriorityIntervalSecs);
}
static void tellPeerWhatWeHave(tr_peerMsgsImpl* msgs)
@ -2599,9 +2607,9 @@ static void sendPex(tr_peerMsgsImpl* msgs)
evbuffer_add_uint8(out, BtPeerMsgs::Ltep);
evbuffer_add_uint8(out, msgs->ut_pex_id);
evbuffer_add_buffer(out, payload);
pokeBatchPeriod(msgs, HighPriorityIntervalSecs);
msgs->pokeBatchPeriod(HighPriorityIntervalSecs);
logtrace(msgs, fmt::format(FMT_STRING("sending a pex message; outMessage size is now {:d}"), evbuffer_get_length(out)));
dbgOutMessageLen(msgs);
msgs->dbgOutMessageLen();
evbuffer_free(payload);
tr_variantFree(&val);

View File

@ -150,6 +150,9 @@ private:
time_t paused_until = 0;
};
void task_request_next_chunk(tr_webseed_task* task);
void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtask);
class tr_webseed : public tr_peer
{
public:
@ -265,6 +268,28 @@ public:
publish(&e);
}
void requestBlocks(tr_block_span_t const* block_spans, size_t n_spans) override
{
auto* const tor = getTorrent();
TR_ASSERT(tor != nullptr);
TR_ASSERT(tor->isRunning);
TR_ASSERT(!tor->isDone());
if (tor == nullptr || !tor->isRunning || tor->isDone())
{
return;
}
for (auto const *span = block_spans, *end = span + n_spans; span != end; ++span)
{
auto* const task = new tr_webseed_task{ tor, this, *span };
evbuffer_add_cb(task->content(), onBufferGotData, task);
tasks.insert(task);
task_request_next_chunk(task);
tr_peerMgrClientSentRequests(tor, this, *span);
}
}
tr_torrent_id_t const torrent_id;
std::string const base_url;
tr_peer_callback const callback;
@ -408,18 +433,16 @@ void onBufferGotData(evbuffer* /*buf*/, evbuffer_cb_info const* info, void* vtas
task->webseed->gotPieceData(n_added);
}
void task_request_next_chunk(tr_webseed_task* task);
void on_idle(tr_webseed* w)
void on_idle(tr_webseed* webseed)
{
auto* const tor = w->getTorrent();
if (tor == nullptr || !tor->isRunning || tor->isDone())
auto const slots_available = webseed->connection_limiter.slotsAvailable();
if (slots_available == 0)
{
return;
}
auto const slots_available = w->connection_limiter.slotsAvailable();
if (slots_available == 0)
auto* const tor = webseed->getTorrent();
if (tor == nullptr || !tor->isRunning || tor->isDone())
{
return;
}
@ -428,17 +451,12 @@ void on_idle(tr_webseed* w)
// The actual value of '64' is arbitrary here; we could probably
// be smarter about this.
auto constexpr PreferredBlocksPerTask = size_t{ 64 };
auto const spans = tr_peerMgrGetNextRequests(tor, w, slots_available * PreferredBlocksPerTask);
for (size_t i = 0; i < slots_available && i < std::size(spans); ++i)
auto spans = tr_peerMgrGetNextRequests(tor, webseed, slots_available * PreferredBlocksPerTask);
if (std::size(spans) > slots_available)
{
auto const& span = spans[i];
auto* const task = new tr_webseed_task{ tor, w, span };
evbuffer_add_cb(task->content(), onBufferGotData, task);
w->tasks.insert(task);
task_request_next_chunk(task);
tr_peerMgrClientSentRequests(tor, w, span);
spans.resize(slots_available);
}
webseed->requestBlocks(std::data(spans), std::size(spans));
}
void onPartialDataFetched(tr_web::FetchResponse const& web_response)