mirror of
https://github.com/transmission/transmission
synced 2025-03-11 06:32:59 +00:00
feat: µTP delayed ack (#6586)
* chore: rename tr utp functions to snake_case * refactor: make udp sockets non-blocking * feat: rudimentary uTP delayed ACK * chore: housekeeping * chore: correct comment about µTP packet format
This commit is contained in:
parent
20aef2f79d
commit
3e958cfbaf
4 changed files with 94 additions and 64 deletions
|
@ -736,7 +736,7 @@ void tr_session::initImpl(init_data& data)
|
||||||
|
|
||||||
setSettings(settings, true);
|
setSettings(settings, true);
|
||||||
|
|
||||||
tr_utpInit(this);
|
tr_utp_init(this);
|
||||||
|
|
||||||
/* cleanup */
|
/* cleanup */
|
||||||
data.done_cv.notify_one();
|
data.done_cv.notify_one();
|
||||||
|
@ -1406,7 +1406,7 @@ void tr_session::closeImplPart2(std::promise<void>* closed_promise, std::chrono:
|
||||||
stats().save();
|
stats().save();
|
||||||
peer_mgr_.reset();
|
peer_mgr_.reset();
|
||||||
openFiles().close_all();
|
openFiles().close_all();
|
||||||
tr_utpClose(this);
|
tr_utp_close(this);
|
||||||
this->udp_core_.reset();
|
this->udp_core_.reset();
|
||||||
|
|
||||||
// tada we are done!
|
// tada we are done!
|
||||||
|
|
|
@ -27,8 +27,9 @@
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
/* Since we use a single UDP socket in order to implement multiple
|
|
||||||
µTP sockets, try to set up huge buffers. */
|
// Since we use a single UDP socket in order to implement multiple
|
||||||
|
// µTP sockets, try to set up huge buffers.
|
||||||
void set_socket_buffers(tr_socket_t fd, bool large)
|
void set_socket_buffers(tr_socket_t fd, bool large)
|
||||||
{
|
{
|
||||||
static auto constexpr RecvBufferSize = 4 * 1024 * 1024;
|
static auto constexpr RecvBufferSize = 4 * 1024 * 1024;
|
||||||
|
@ -41,33 +42,25 @@ void set_socket_buffers(tr_socket_t fd, bool large)
|
||||||
socklen_t sbuf_len = sizeof(sbuf);
|
socklen_t sbuf_len = sizeof(sbuf);
|
||||||
|
|
||||||
int size = large ? RecvBufferSize : SmallBufferSize;
|
int size = large ? RecvBufferSize : SmallBufferSize;
|
||||||
int rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char const*>(&size), sizeof(size));
|
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char const*>(&size), sizeof(size)) < 0)
|
||||||
|
|
||||||
if (rc < 0)
|
|
||||||
{
|
{
|
||||||
tr_logAddDebug(fmt::format("Couldn't set receive buffer: {}", tr_net_strerror(sockerrno)));
|
tr_logAddDebug(fmt::format("Couldn't set receive buffer: {}", tr_net_strerror(sockerrno)));
|
||||||
}
|
}
|
||||||
|
|
||||||
size = large ? SendBufferSize : SmallBufferSize;
|
size = large ? SendBufferSize : SmallBufferSize;
|
||||||
rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char const*>(&size), sizeof(size));
|
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char const*>(&size), sizeof(size)) < 0)
|
||||||
|
|
||||||
if (rc < 0)
|
|
||||||
{
|
{
|
||||||
tr_logAddDebug(fmt::format("Couldn't set send buffer: {}", tr_net_strerror(sockerrno)));
|
tr_logAddDebug(fmt::format("Couldn't set send buffer: {}", tr_net_strerror(sockerrno)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (large)
|
if (large)
|
||||||
{
|
{
|
||||||
rc = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&rbuf), &rbuf_len);
|
if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&rbuf), &rbuf_len) < 0)
|
||||||
|
|
||||||
if (rc < 0)
|
|
||||||
{
|
{
|
||||||
rbuf = 0;
|
rbuf = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sbuf), &sbuf_len);
|
if (getsockopt(fd, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sbuf), &sbuf_len) < 0)
|
||||||
|
|
||||||
if (rc < 0)
|
|
||||||
{
|
{
|
||||||
sbuf = 0;
|
sbuf = 0;
|
||||||
}
|
}
|
||||||
|
@ -99,41 +92,55 @@ void event_callback(evutil_socket_t s, [[maybe_unused]] short type, void* vsessi
|
||||||
auto from = sockaddr_storage{};
|
auto from = sockaddr_storage{};
|
||||||
auto fromlen = socklen_t{ sizeof(from) };
|
auto fromlen = socklen_t{ sizeof(from) };
|
||||||
auto* const from_sa = reinterpret_cast<sockaddr*>(&from);
|
auto* const from_sa = reinterpret_cast<sockaddr*>(&from);
|
||||||
|
|
||||||
auto const rc = recvfrom(s, reinterpret_cast<char*>(std::data(buf)), std::size(buf) - 1, 0, from_sa, &fromlen);
|
|
||||||
if (rc <= 0)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Since most packets we receive here are µTP, make quick inline
|
|
||||||
checks for the other protocols. The logic is as follows:
|
|
||||||
- all DHT packets start with 'd'
|
|
||||||
- all UDP tracker packets start with a 32-bit (!) "action", which
|
|
||||||
is between 0 and 3
|
|
||||||
- the above cannot be µTP packets, since these start with a 4-bit
|
|
||||||
version number (1). */
|
|
||||||
auto* const session = static_cast<tr_session*>(vsession);
|
auto* const session = static_cast<tr_session*>(vsession);
|
||||||
if (buf[0] == 'd')
|
auto got_utp_packet = false;
|
||||||
|
|
||||||
|
for (;;)
|
||||||
{
|
{
|
||||||
if (session->dht_)
|
auto const n_read = recvfrom(s, reinterpret_cast<char*>(std::data(buf)), std::size(buf) - 1, 0, from_sa, &fromlen);
|
||||||
|
if (n_read <= 0)
|
||||||
{
|
{
|
||||||
buf[rc] = '\0'; // libdht requires zero-terminated messages
|
if (got_utp_packet)
|
||||||
session->dht_->handle_message(std::data(buf), rc, from_sa, fromlen);
|
{
|
||||||
|
// To reduce protocol overhead, we wait until we've read all UDP packets
|
||||||
|
// we can, then send one ACK for each µTP socket that received packet(s).
|
||||||
|
tr_utp_issue_deferred_acks(session);
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else if (rc >= 8 && buf[0] == 0 && buf[1] == 0 && buf[2] == 0 && buf[3] <= 3)
|
// Since most packets we receive here are µTP, make quick inline
|
||||||
{
|
// checks for the other protocols. The logic is as follows:
|
||||||
if (!session->announcer_udp_->handle_message(std::data(buf), rc))
|
// - all DHT packets start with 'd' (100)
|
||||||
|
// - all UDP tracker packets start with a 32-bit (!) "action", which
|
||||||
|
// is between 0 and 3
|
||||||
|
// - the above cannot be µTP packets, since these start with a 4-bit
|
||||||
|
// "type" between 0 and 4, followed by a 4-bit version number (1)
|
||||||
|
if (buf[0] == 'd')
|
||||||
{
|
{
|
||||||
tr_logAddTrace("Couldn't parse UDP tracker packet.");
|
if (session->dht_)
|
||||||
|
{
|
||||||
|
buf[n_read] = '\0'; // libdht requires zero-terminated messages
|
||||||
|
session->dht_->handle_message(std::data(buf), n_read, from_sa, fromlen);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
else if (n_read >= 8 && buf[0] == 0 && buf[1] == 0 && buf[2] == 0 && buf[3] <= 3)
|
||||||
else if (session->allowsUTP() && (session->utp_context != nullptr))
|
|
||||||
{
|
|
||||||
if (!tr_utpPacket(std::data(buf), rc, from_sa, fromlen, session))
|
|
||||||
{
|
{
|
||||||
tr_logAddTrace("Unexpected UDP packet");
|
if (!session->announcer_udp_->handle_message(std::data(buf), n_read))
|
||||||
|
{
|
||||||
|
tr_logAddTrace("Couldn't parse UDP tracker packet.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (session->allowsUTP() && session->utp_context != nullptr)
|
||||||
|
{
|
||||||
|
if (tr_utp_packet(std::data(buf), n_read, from_sa, fromlen, session))
|
||||||
|
{
|
||||||
|
got_utp_packet = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tr_logAddTrace("Unexpected UDP packet");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,7 +165,18 @@ tr_session::tr_udp_core::tr_udp_core(tr_session& session, tr_port udp_port)
|
||||||
auto const addr = session_.bind_address(TR_AF_INET);
|
auto const addr = session_.bind_address(TR_AF_INET);
|
||||||
auto const [ss, sslen] = tr_socket_address::to_sockaddr(addr, udp_port_);
|
auto const [ss, sslen] = tr_socket_address::to_sockaddr(addr, udp_port_);
|
||||||
|
|
||||||
if (bind(sock, reinterpret_cast<sockaddr const*>(&ss), sslen) != 0)
|
if (evutil_make_socket_nonblocking(sock) != 0)
|
||||||
|
{
|
||||||
|
auto const error_code = errno;
|
||||||
|
tr_logAddWarn(fmt::format(
|
||||||
|
_("Couldn't make IPv4 socket non-blocking {address}: {error} ({error_code})"),
|
||||||
|
fmt::arg("address", tr_socket_address::display_name(addr, udp_port_)),
|
||||||
|
fmt::arg("error", tr_strerror(error_code)),
|
||||||
|
fmt::arg("error_code", error_code)));
|
||||||
|
|
||||||
|
tr_net_close_socket(sock);
|
||||||
|
}
|
||||||
|
else if (bind(sock, reinterpret_cast<sockaddr const*>(&ss), sslen) != 0)
|
||||||
{
|
{
|
||||||
auto const error_code = errno;
|
auto const error_code = errno;
|
||||||
tr_logAddWarn(fmt::format(
|
tr_logAddWarn(fmt::format(
|
||||||
|
@ -187,12 +205,23 @@ tr_session::tr_udp_core::tr_udp_core(tr_session& session, tr_port udp_port)
|
||||||
else if (auto sock = socket(PF_INET6, SOCK_DGRAM, 0); sock != TR_BAD_SOCKET)
|
else if (auto sock = socket(PF_INET6, SOCK_DGRAM, 0); sock != TR_BAD_SOCKET)
|
||||||
{
|
{
|
||||||
auto optval = 1;
|
auto optval = 1;
|
||||||
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char const*>(&optval), sizeof(optval));
|
(void)setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char const*>(&optval), sizeof(optval));
|
||||||
|
|
||||||
auto const addr = session_.bind_address(TR_AF_INET6);
|
auto const addr = session_.bind_address(TR_AF_INET6);
|
||||||
auto const [ss, sslen] = tr_socket_address::to_sockaddr(addr, udp_port_);
|
auto const [ss, sslen] = tr_socket_address::to_sockaddr(addr, udp_port_);
|
||||||
|
|
||||||
if (bind(sock, reinterpret_cast<sockaddr const*>(&ss), sslen) != 0)
|
if (evutil_make_socket_nonblocking(sock) != 0)
|
||||||
|
{
|
||||||
|
auto const error_code = errno;
|
||||||
|
tr_logAddWarn(fmt::format(
|
||||||
|
_("Couldn't make IPv6 socket non-blocking {address}: {error} ({error_code})"),
|
||||||
|
fmt::arg("address", tr_socket_address::display_name(addr, udp_port_)),
|
||||||
|
fmt::arg("error", tr_strerror(error_code)),
|
||||||
|
fmt::arg("error_code", error_code)));
|
||||||
|
|
||||||
|
tr_net_close_socket(sock);
|
||||||
|
}
|
||||||
|
else if (bind(sock, reinterpret_cast<sockaddr const*>(&ss), sslen) != 0)
|
||||||
{
|
{
|
||||||
auto const error_code = errno;
|
auto const error_code = errno;
|
||||||
tr_logAddWarn(fmt::format(
|
tr_logAddWarn(fmt::format(
|
||||||
|
|
|
@ -155,11 +155,11 @@ void restart_timer(tr_session* session)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* If somebody has disabled µTP, then we still want to run
|
// If somebody has disabled µTP, then we still want to run
|
||||||
utp_check_timeouts, in order to let closed sockets finish
|
// utp_check_timeouts, in order to let closed sockets finish
|
||||||
gracefully and so on. However, since we're not particularly
|
// gracefully and so on. However, since we're not particularly
|
||||||
interested in that happening in a timely manner, we might as
|
// interested in that happening in a timely manner, we might as
|
||||||
well use a large timeout. */
|
// well use a large timeout.
|
||||||
static auto constexpr MinInterval = 2s;
|
static auto constexpr MinInterval = 2s;
|
||||||
static auto constexpr MaxInterval = 3s;
|
static auto constexpr MaxInterval = 3s;
|
||||||
auto const target = MinInterval + random_percent * (MaxInterval - MinInterval);
|
auto const target = MinInterval + random_percent * (MaxInterval - MinInterval);
|
||||||
|
@ -173,15 +173,12 @@ void timer_callback(void* vsession)
|
||||||
{
|
{
|
||||||
auto* session = static_cast<tr_session*>(vsession);
|
auto* session = static_cast<tr_session*>(vsession);
|
||||||
|
|
||||||
/* utp_internal.cpp says "Should be called each time the UDP socket is drained" but it's tricky with libevent */
|
|
||||||
utp_issue_deferred_acks(session->utp_context);
|
|
||||||
|
|
||||||
utp_check_timeouts(session->utp_context);
|
utp_check_timeouts(session->utp_context);
|
||||||
restart_timer(session);
|
restart_timer(session);
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
void tr_utpInit(tr_session* session)
|
void tr_utp_init(tr_session* session)
|
||||||
{
|
{
|
||||||
if (session->utp_context != nullptr)
|
if (session->utp_context != nullptr)
|
||||||
{
|
{
|
||||||
|
@ -212,17 +209,19 @@ void tr_utpInit(tr_session* session)
|
||||||
restart_timer(session);
|
restart_timer(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tr_utpPacket(unsigned char const* buf, size_t buflen, struct sockaddr const* from, socklen_t fromlen, tr_session* ss)
|
bool tr_utp_packet(unsigned char const* buf, size_t buflen, struct sockaddr const* from, socklen_t fromlen, tr_session* ss)
|
||||||
{
|
{
|
||||||
auto const ret = utp_process_udp(ss->utp_context, buf, buflen, from, fromlen);
|
auto const ret = utp_process_udp(ss->utp_context, buf, buflen, from, fromlen);
|
||||||
|
|
||||||
/* utp_internal.cpp says "Should be called each time the UDP socket is drained" but it's tricky with libevent */
|
|
||||||
utp_issue_deferred_acks(ss->utp_context);
|
|
||||||
|
|
||||||
return ret != 0;
|
return ret != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tr_utpClose(tr_session* session)
|
void tr_utp_issue_deferred_acks(tr_session* ss)
|
||||||
|
{
|
||||||
|
utp_issue_deferred_acks(ss->utp_context);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tr_utp_close(tr_session* session)
|
||||||
{
|
{
|
||||||
session->utp_timer.reset();
|
session->utp_timer.reset();
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,10 @@
|
||||||
|
|
||||||
struct tr_session;
|
struct tr_session;
|
||||||
|
|
||||||
void tr_utpInit(tr_session* session);
|
void tr_utp_init(tr_session* session);
|
||||||
|
|
||||||
bool tr_utpPacket(unsigned char const* buf, size_t buflen, struct sockaddr const* from, socklen_t fromlen, tr_session* ss);
|
bool tr_utp_packet(unsigned char const* buf, size_t buflen, struct sockaddr const* from, socklen_t fromlen, tr_session* ss);
|
||||||
|
|
||||||
void tr_utpClose(tr_session*);
|
void tr_utp_issue_deferred_acks(tr_session* ss);
|
||||||
|
|
||||||
|
void tr_utp_close(tr_session* session);
|
||||||
|
|
Loading…
Add table
Reference in a new issue