fix: implement proper download limit for uTP (#6416)

* fix: return read buffer size in libutp read buffer size callback

* refactor: clamp amount of data processed in `can_read_wrapper()`

* chore: housekeeping

* refactor: call `utp_read_drained()` in on-read callback

so that uTP acks can be sent out in a more timely fashion
This commit is contained in:
Yat Ho 2024-01-02 14:33:53 +08:00 committed by GitHub
parent 17a63b8026
commit 5000edef01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 29 deletions

View File

@ -375,7 +375,13 @@ void tr_peerIo::can_read_wrapper()
auto done = bool{ false };
auto err = bool{ false };
while (!done && !err)
// In normal conditions, only continue processing if we still have bandwidth
// quota for it.
//
// The read buffer will grow indefinitely if libutp or the TCP stack keeps buffering
// data faster than the bandwidth limit allows. To safeguard against that, we keep
// processing if the read buffer is more than twice as large as the target size.
while (!done && !err && (read_buffer_size() > RcvBuf * 2U || bandwidth().clamp(TR_DOWN, read_buffer_size()) != 0U))
{
size_t piece = 0U;
auto const old_len = read_buffer_size();
@ -383,17 +389,14 @@ void tr_peerIo::can_read_wrapper()
auto const used = old_len - read_buffer_size();
auto const overhead = socket_.guess_packet_overhead(used);
if (piece != 0U || piece != used)
if (piece != 0U)
{
if (piece != 0U)
{
bandwidth().notify_bandwidth_consumed(TR_DOWN, piece, true, now);
}
bandwidth().notify_bandwidth_consumed(TR_DOWN, piece, true, now);
}
if (used != piece)
{
bandwidth().notify_bandwidth_consumed(TR_DOWN, used - piece, false, now);
}
if (used != piece)
{
bandwidth().notify_bandwidth_consumed(TR_DOWN, used - piece, false, now);
}
if (overhead > 0U)
@ -725,6 +728,14 @@ void tr_peerIo::utp_init([[maybe_unused]] struct_utp_context* ctx)
io->inbuf_.add(args->buf, args->len);
io->set_enabled(TR_DOWN, true);
io->can_read_wrapper();
// utp_read_drained() notifies libutp that this read buffer is empty.
// It opens up the congestion window by sending an ACK (soonish) if
// one was not going to be sent.
if (std::empty(io->inbuf_))
{
utp_read_drained(io->socket_.handle.utp);
}
}
return {};
});
@ -736,16 +747,7 @@ void tr_peerIo::utp_init([[maybe_unused]] struct_utp_context* ctx)
{
if (auto const* const io = static_cast<tr_peerIo*>(utp_get_userdata(args->socket)); io != nullptr)
{
// We use this callback to enforce speed limits by telling
// libutp to read no more than `target_dl_bytes` bytes.
auto const target_dl_bytes = io->bandwidth_.clamp(TR_DOWN, RcvBuf);
// libutp's private function get_rcv_window() allows libutp
// to read up to (UTP_RCVBUF - READ_BUFFER_SIZE) bytes and
// UTP_RCVBUF is set to `RcvBuf` by tr_peerIo::utp_init().
// So to limit dl to `target_dl_bytes`, we need to return
// N where (`target_dl_bytes` == RcvBuf - N).
return RcvBuf - target_dl_bytes;
return io->read_buffer_size();
}
return {};
});

View File

@ -341,8 +341,10 @@ private:
}
}
#ifdef WITH_UTP
void on_utp_state_change(int new_state);
void on_utp_error(int errcode);
#endif
void close();

View File

@ -122,15 +122,9 @@ size_t tr_peer_socket::try_read(InBuf& buf, size_t max, [[maybe_unused]] bool bu
return buf.add_socket(handle.tcp, max, error);
}
#ifdef WITH_UTP
// utp_read_drained() notifies libutp that this read buffer is empty.
// It opens up the congestion window by sending an ACK (soonish) if
// one was not going to be sent.
if (is_utp() && buf_is_empty)
{
utp_read_drained(handle.utp);
}
#endif
// Unlike conventional BSD-style socket API, libutp pushes incoming data to the
// caller via a callback, instead of allowing the caller to pull data from
// its buffers. Therefore, reading data from a uTP socket is not handled here.
return {};
}