refactor: decouple peer limits from fdlimit (#2969)

This commit is contained in:
Charles Kerr 2022-04-22 16:15:06 -05:00 committed by GitHub
parent 1ebac744a5
commit 0a3018d706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 118 additions and 160 deletions

View File

@ -380,7 +380,6 @@ static struct tr_cached_file* fileset_get_empty_slot(struct tr_fileset* set)
struct tr_fdInfo
{
int peerCount;
struct tr_fileset fileset;
};
@ -501,114 +500,3 @@ tr_sys_file_t tr_fdFileCheckout(
o->used_at = tr_time();
return o->fd;
}
/***
****
**** Sockets
****
***/
tr_socket_t tr_fdSocketCreate(tr_session* session, int domain, int type)
{
TR_ASSERT(tr_isSession(session));
tr_socket_t s = TR_BAD_SOCKET;
ensureSessionFdInfoExists(session);
tr_fdInfo* gFd = session->fdInfo;
if (gFd->peerCount < session->peerLimit)
{
s = socket(domain, type, 0);
if ((s == TR_BAD_SOCKET) && (sockerrno != EAFNOSUPPORT))
{
tr_logAddWarn(fmt::format(
_("Couldn't create socket: {error} ({error_code})"),
fmt::arg("error", tr_net_strerror(sockerrno)),
fmt::arg("error_code", sockerrno)));
}
}
if (s != TR_BAD_SOCKET)
{
++gFd->peerCount;
}
TR_ASSERT(gFd->peerCount >= 0);
if (s != TR_BAD_SOCKET)
{
static bool buf_logged = false;
if (!buf_logged)
{
int i = 0;
socklen_t size = sizeof(i);
if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&i), &size) != -1)
{
tr_logAddTrace(fmt::format("SO_SNDBUF size is {}", i));
}
i = 0;
size = sizeof(i);
if (getsockopt(s, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&i), &size) != -1)
{
tr_logAddTrace(fmt::format("SO_RCVBUF size is {}", i));
}
buf_logged = true;
}
}
return s;
}
tr_socket_t tr_fdSocketAccept(tr_session* s, tr_socket_t sockfd, tr_address* addr, tr_port* port)
{
TR_ASSERT(tr_isSession(s));
TR_ASSERT(addr != nullptr);
TR_ASSERT(port != nullptr);
ensureSessionFdInfoExists(s);
tr_fdInfo* const gFd = s->fdInfo;
struct sockaddr_storage sock;
socklen_t len = sizeof(struct sockaddr_storage);
tr_socket_t fd = accept(sockfd, (struct sockaddr*)&sock, &len);
if (fd != TR_BAD_SOCKET)
{
if (gFd->peerCount < s->peerLimit && tr_address_from_sockaddr_storage(addr, port, &sock))
{
++gFd->peerCount;
}
else
{
tr_netCloseSocket(fd);
fd = TR_BAD_SOCKET;
}
}
return fd;
}
void tr_fdSocketClose(tr_session* session, tr_socket_t fd)
{
TR_ASSERT(tr_isSession(session));
if (session->fdInfo != nullptr)
{
struct tr_fdInfo* gFd = session->fdInfo;
if (fd != TR_BAD_SOCKET)
{
tr_netCloseSocket(fd);
--gFd->peerCount;
}
TR_ASSERT(gFd->peerCount >= 0);
}
}

View File

@ -66,15 +66,6 @@ void tr_fdFileClose(tr_session* session, tr_torrent const* tor, tr_file_index_t
*/
void tr_fdTorrentClose(tr_session* session, int torrentId);
/***********************************************************************
* Sockets
**********************************************************************/
tr_socket_t tr_fdSocketCreate(tr_session* session, int domain, int type);
tr_socket_t tr_fdSocketAccept(tr_session* session, tr_socket_t listening_sockfd, tr_address* addr, tr_port* port);
void tr_fdSocketClose(tr_session* session, tr_socket_t s);
/***********************************************************************
* tr_fdClose
***********************************************************************

View File

@ -6,6 +6,7 @@
#include <array>
#include <cerrno>
#include <climits>
#include <cstdint>
#include <cstring>
#include <ctime>
#include <string_view>
@ -23,20 +24,18 @@
#include <fmt/core.h>
#include <cstdint>
#include <libutp/utp.h>
#include "transmission.h"
#include "fdlimit.h" /* tr_fdSocketClose() */
#include "log.h"
#include "net.h"
#include "peer-socket.h" /* for struct tr_peer_socket */
#include "session.h" /* tr_sessionGetPublicAddress() */
#include "peer-socket.h"
#include "session.h"
#include "tr-assert.h"
#include "tr-macros.h"
#include "tr-utp.h" /* tr_utpSendTo() */
#include "utils.h" /* tr_time() */
#include "tr-utp.h"
#include "utils.h"
#ifndef IN_MULTICAST
#define IN_MULTICAST(a) (((a)&0xf0000000) == 0xe0000000)
@ -293,29 +292,72 @@ static socklen_t setup_sockaddr(tr_address const* addr, tr_port port, struct soc
return sizeof(struct sockaddr_in6);
}
struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool clientIsSeed)
static tr_socket_t createSocket(tr_session* session, int domain, int type)
{
TR_ASSERT(tr_isSession(session));
auto const sockfd = socket(domain, type, 0);
if (sockfd == TR_BAD_SOCKET)
{
if (sockerrno != EAFNOSUPPORT)
{
tr_logAddWarn(fmt::format(
_("Couldn't create socket: {error} ({error_code})"),
fmt::arg("error", tr_net_strerror(sockerrno)),
fmt::arg("error_code", sockerrno)));
}
return TR_BAD_SOCKET;
}
if ((evutil_make_socket_nonblocking(sockfd) == -1) || !session->incPeerCount())
{
tr_netClose(session, sockfd);
return {};
}
if (static bool buf_logged = false; !buf_logged)
{
int i = 0;
socklen_t size = sizeof(i);
if (getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&i), &size) != -1)
{
tr_logAddTrace(fmt::format("SO_SNDBUF size is {}", i));
}
i = 0;
size = sizeof(i);
if (getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char*>(&i), &size) != -1)
{
tr_logAddTrace(fmt::format("SO_RCVBUF size is {}", i));
}
buf_logged = true;
}
return sockfd;
}
struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const* addr, tr_port port, bool client_is_seed)
{
TR_ASSERT(tr_address_is_valid(addr));
auto ret = tr_peer_socket{};
static int const domains[NUM_TR_AF_INET_TYPES] = { AF_INET, AF_INET6 };
struct sockaddr_storage sock;
struct sockaddr_storage source_sock;
if (!tr_address_is_valid_for_peers(addr, port))
{
return ret;
return {};
}
auto const s = tr_fdSocketCreate(session, domains[addr->type], SOCK_STREAM);
static auto constexpr Domains = std::array<int, NUM_TR_AF_INET_TYPES>{ AF_INET, AF_INET6 };
auto const s = createSocket(session, Domains[addr->type], SOCK_STREAM);
if (s == TR_BAD_SOCKET)
{
return ret;
return {};
}
/* seeds don't need much of a read buffer... */
if (clientIsSeed)
// seeds don't need a big read buffer, so make it smaller
if (client_is_seed)
{
int n = 8192;
@ -325,17 +367,13 @@ struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const
}
}
if (evutil_make_socket_nonblocking(s) == -1)
{
tr_netClose(session, s);
return ret;
}
struct sockaddr_storage sock;
socklen_t const addrlen = setup_sockaddr(addr, port, &sock);
/* set source address */
// set source address
tr_address const* const source_addr = tr_sessionGetPublicAddress(session, addr->type, nullptr);
TR_ASSERT(source_addr != nullptr);
struct sockaddr_storage source_sock;
socklen_t const sourcelen = setup_sockaddr(source_addr, {}, &source_sock);
if (bind(s, (struct sockaddr*)&source_sock, sourcelen) == -1)
@ -347,9 +385,10 @@ struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const
fmt::arg("error", tr_net_strerror(sockerrno)),
fmt::arg("error_code", sockerrno)));
tr_netClose(session, s);
return ret;
return {};
}
auto ret = tr_peer_socket{};
if (connect(s, (struct sockaddr*)&sock, addrlen) == -1 &&
#ifdef _WIN32
sockerrno != WSAEWOULDBLOCK &&
@ -383,7 +422,11 @@ struct tr_peer_socket tr_netOpenPeerSocket(tr_session* session, tr_address const
return ret;
}
struct tr_peer_socket tr_netOpenPeerUTPSocket(tr_session* session, tr_address const* addr, tr_port port, bool /*clientIsSeed*/)
struct tr_peer_socket tr_netOpenPeerUTPSocket(
tr_session* session,
tr_address const* addr,
tr_port port,
bool /*client_is_seed*/)
{
auto ret = tr_peer_socket{};
@ -548,27 +591,43 @@ bool tr_net_hasIPv6(tr_port port)
return result;
}
tr_socket_t tr_netAccept(tr_session* session, tr_socket_t b, tr_address* addr, tr_port* port)
tr_socket_t tr_netAccept(tr_session* session, tr_socket_t listening_sockfd, tr_address* addr, tr_port* port)
{
tr_socket_t fd = tr_fdSocketAccept(session, b, addr, port);
TR_ASSERT(tr_isSession(session));
TR_ASSERT(addr != nullptr);
TR_ASSERT(port != nullptr);
if (fd != TR_BAD_SOCKET && evutil_make_socket_nonblocking(fd) == -1)
// accept the incoming connection
struct sockaddr_storage sock;
socklen_t len = sizeof(struct sockaddr_storage);
auto sockfd = accept(listening_sockfd, (struct sockaddr*)&sock, &len);
if (sockfd == TR_BAD_SOCKET)
{
tr_netClose(session, fd);
fd = TR_BAD_SOCKET;
return TR_BAD_SOCKET;
}
return fd;
// get the address and port,
// make the socket unblocking,
// and confirm we don't have too many peers
if (!tr_address_from_sockaddr_storage(addr, port, &sock) || evutil_make_socket_nonblocking(sockfd) == -1 ||
!session->incPeerCount())
{
tr_netCloseSocket(sockfd);
return TR_BAD_SOCKET;
}
return sockfd;
}
void tr_netCloseSocket(tr_socket_t fd)
void tr_netCloseSocket(tr_socket_t sockfd)
{
evutil_closesocket(fd);
evutil_closesocket(sockfd);
}
void tr_netClose(tr_session* session, tr_socket_t s)
void tr_netClose(tr_session* session, tr_socket_t sockfd)
{
tr_fdSocketClose(session, s);
tr_netCloseSocket(sockfd);
session->decPeerCount();
}
/*

View File

@ -254,6 +254,25 @@ public:
tr_netSetTOS(sock, peer_socket_tos_, type);
}
[[nodiscard]] constexpr bool incPeerCount() noexcept
{
if (this->peerCount >= this->peerLimit)
{
return false;
}
++this->peerCount;
return true;
}
constexpr void decPeerCount() noexcept
{
if (this->peerCount > 0)
{
--this->peerCount;
}
}
// bandwidth
[[nodiscard]] Bandwidth& getBandwidthGroup(std::string_view name);
@ -308,8 +327,9 @@ public:
struct evdns_base* evdns_base;
struct tr_event_handle* events;
uint16_t peerLimit;
uint16_t peerLimitPerTorrent;
uint16_t peerCount = 0;
uint16_t peerLimit = 200;
uint16_t peerLimitPerTorrent = 50;
int uploadSlotsPerTorrent;