feat: ipv6 lpd

This commit is contained in:
Yat Ho 2024-03-15 15:03:31 +08:00
parent 821a6816ef
commit 9bcb86ebb0
2 changed files with 226 additions and 86 deletions

View File

@ -14,6 +14,7 @@
#include <optional>
#include <string>
#include <string_view>
#include <type_traits>
#include <vector>
#ifdef _WIN32
@ -47,6 +48,8 @@ using namespace std::literals;
namespace
{
using ipp_t = std::underlying_type_t<tr_address_type>;
// opaque value, allowing the sending client to filter out its
// own announces if it receives them via multicast loopback
auto makeCookie()
@ -62,8 +65,8 @@ auto makeCookie()
return std::string{ std::data(buf), std::size(buf) };
}
constexpr char const* const McastGroup = "239.192.152.143"; /**<LPD multicast group */
auto constexpr McastPort = tr_port::from_host(6771); /**<LPD source and destination UPD port */
auto constexpr McastSockAddr = std::array{ "239.192.152.143:6771"sv, "[ff15::efc0:988f]:6771"sv };
static_assert(std::size(McastSockAddr) == NUM_TR_AF_INET_TYPES);
/*
* A LSD announce is formatted as follows:
@ -84,14 +87,23 @@ auto constexpr McastPort = tr_port::from_host(6771); /**<LPD source and destinat
* multiple infohashes the packet length should not exceed 1400
* bytes to avoid MTU/fragmentation problems.
*/
auto makeAnnounceMsg(std::string_view cookie, tr_port port, std::vector<std::string_view> const& info_hash_strings)
std::string makeAnnounceMsg(
tr_address_type ip_protocol,
std::string_view cookie,
tr_port port,
std::vector<std::string_view> const& info_hash_strings)
{
TR_ASSERT(tr_address::is_valid(ip_protocol));
if (!tr_address::is_valid(ip_protocol))
{
return {};
}
auto ret = fmt::format(
"BT-SEARCH * HTTP/1.1\r\n"
"Host: {:s}:{:d}\r\n"
"Host: {:s}\r\n"
"Port: {:d}\r\n",
McastGroup,
McastPort.host(),
McastSockAddr[ip_protocol],
port.host());
for (auto const& info_hash : info_hash_strings)
@ -230,11 +242,17 @@ public:
~tr_lpd_impl() override
{
event_.reset();
if (mcast_socket_ != TR_BAD_SOCKET)
for (auto& event : events_)
{
tr_net_close_socket(mcast_socket_);
event.reset();
}
for (auto const sock : mcast_socket_)
{
if (sock != TR_BAD_SOCKET)
{
tr_net_close_socket(sock);
}
}
tr_logAddTrace("Done uninitialising Local Peer Discovery");
@ -243,19 +261,34 @@ public:
private:
bool init(struct event_base* event_base)
{
if (initImpl(event_base))
ipp_t n_success = NUM_TR_AF_INET_TYPES;
if (!initImpl<TR_AF_INET>(event_base))
{
return true;
auto const err = sockerrno;
tr_net_close_socket(mcast_socket_[TR_AF_INET]);
mcast_socket_[TR_AF_INET] = TR_BAD_SOCKET;
tr_logAddWarn(fmt::format(
_("Couldn't initialize {ip_protocol} LPD: {error} ({error_code})"),
fmt::arg("ip_protocol", tr_ip_protocol_to_sv(TR_AF_INET)),
fmt::arg("error", tr_strerror(err)),
fmt::arg("error_code", err)));
--n_success;
}
auto const err = sockerrno;
tr_net_close_socket(mcast_socket_);
mcast_socket_ = TR_BAD_SOCKET;
tr_logAddWarn(fmt::format(
_("Couldn't initialize LPD: {error} ({error_code})"),
fmt::arg("error", tr_strerror(err)),
fmt::arg("error_code", err)));
return false;
if (!initImpl<TR_AF_INET6>(event_base))
{
auto const err = sockerrno;
tr_net_close_socket(mcast_socket_[TR_AF_INET6]);
mcast_socket_[TR_AF_INET6] = TR_BAD_SOCKET;
tr_logAddWarn(fmt::format(
_("Couldn't initialize {ip_protocol} LPD: {error} ({error_code})"),
fmt::arg("ip_protocol", tr_ip_protocol_to_sv(TR_AF_INET6)),
fmt::arg("error", tr_strerror(err)),
fmt::arg("error_code", err)));
--n_success;
}
return n_success != 0U;
}
/**
@ -263,89 +296,159 @@ private:
*
* For the most part, this means setting up an appropriately configured multicast socket
* and event-based message handling.
*
* @remark Since the LPD service does not use another protocol family yet, this code is
* IPv4 only for the time being.
*/
template<tr_address_type ip_protocol>
bool initImpl(struct event_base* event_base)
{
auto const opt_on = 1;
auto& sock = mcast_socket_[ip_protocol];
static_assert(AnnounceScope > 0);
static_assert(tr_address::is_valid(ip_protocol));
tr_logAddDebug("Initialising Local Peer Discovery");
tr_logAddDebug(fmt::format("Initialising {} Local Peer Discovery", tr_ip_protocol_to_sv(ip_protocol)));
/* setup datagram socket */
{
mcast_socket_ = socket(PF_INET, SOCK_DGRAM, 0);
sock = socket(tr_ip_protocol_to_af(ip_protocol), SOCK_DGRAM, 0);
if (mcast_socket_ == TR_BAD_SOCKET)
if (sock == TR_BAD_SOCKET)
{
return false;
}
if (evutil_make_socket_nonblocking(mcast_socket_) == -1)
if (evutil_make_socket_nonblocking(sock) == -1)
{
return false;
}
if (setsockopt(mcast_socket_, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char const*>(&opt_on), sizeof(opt_on)) ==
-1)
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char const*>(&opt_on), sizeof(opt_on)) == -1)
{
return false;
}
#if HAVE_SO_REUSEPORT
if (setsockopt(mcast_socket_, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast<char const*>(&opt_on), sizeof(opt_on)) ==
-1)
if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast<char const*>(&opt_on), sizeof(opt_on)) == -1)
{
return false;
}
#endif
auto const [bind_ss, bind_sslen] = tr_socket_address::to_sockaddr(mediator_.bind_address(TR_AF_INET), McastPort);
if constexpr (ip_protocol == TR_AF_INET6)
{
// must be done before binding on Linux
if (evutil_make_listen_socket_ipv6only(sock) == -1)
{
return false;
}
}
if (bind(mcast_socket_, reinterpret_cast<sockaddr const*>(&bind_ss), bind_sslen) == -1)
auto mcast_ss = sockaddr_storage{};
auto mcast_sslen = int{ sizeof(mcast_ss) };
if (evutil_parse_sockaddr_port(
std::data(McastSockAddr[ip_protocol]),
reinterpret_cast<sockaddr*>(&mcast_ss),
&mcast_sslen) == -1)
{
return false;
}
auto mcast_sockaddr = tr_socket_address::from_sockaddr(reinterpret_cast<sockaddr*>(&mcast_ss));
TR_ASSERT(mcast_sockaddr);
auto const [bind_ss, bind_sslen] = tr_socket_address::to_sockaddr(
tr_address::any(ip_protocol),
mcast_sockaddr->port());
if (bind(sock, reinterpret_cast<sockaddr const*>(&bind_ss), bind_sslen) == -1)
{
return false;
}
auto const mcast_addr = tr_address::from_string(McastGroup);
TR_ASSERT(mcast_addr);
auto const [mcast_ss, mcast_sslen] = tr_socket_address::to_sockaddr(*mcast_addr, McastPort);
std::memcpy(&mcast_addr_, &mcast_ss, mcast_sslen);
/* we want to join that LPD multicast group */
ip_mreq mcast_req = {};
mcast_req.imr_multiaddr = mcast_addr_.sin_addr;
mcast_req.imr_interface = reinterpret_cast<sockaddr_in const*>(&bind_ss)->sin_addr;
if (setsockopt(
mcast_socket_,
IPPROTO_IP,
IP_ADD_MEMBERSHIP,
reinterpret_cast<char const*>(&mcast_req),
sizeof(mcast_req)) == -1)
if constexpr (ip_protocol == TR_AF_INET)
{
return false;
std::memcpy(&mcast_addr_, &mcast_ss, mcast_sslen);
/* we want to join that LPD multicast group */
struct ip_mreq mcast_req = {};
mcast_req.imr_multiaddr = mcast_addr_.sin_addr;
mcast_req.imr_interface = mediator_.bind_address(ip_protocol).addr.addr4;
if (setsockopt(
sock,
IPPROTO_IP,
IP_ADD_MEMBERSHIP,
reinterpret_cast<char const*>(&mcast_req),
sizeof(mcast_req)) == -1)
{
return false;
}
/* configure outbound multicast TTL */
if (setsockopt(
sock,
IPPROTO_IP,
IP_MULTICAST_TTL,
reinterpret_cast<char const*>(&AnnounceScope),
sizeof(AnnounceScope)) == -1)
{
return false;
}
if (setsockopt(
sock,
IPPROTO_IP,
IP_MULTICAST_IF,
reinterpret_cast<char const*>(&mcast_req.imr_interface),
sizeof(mcast_req.imr_interface)) == -1)
{
return false;
}
}
/* configure outbound multicast TTL */
if (setsockopt(
mcast_socket_,
IPPROTO_IP,
IP_MULTICAST_TTL,
reinterpret_cast<char const*>(&AnnounceScope),
sizeof(AnnounceScope)) == -1)
else // TR_AF_INET6
{
return false;
std::memcpy(&mcast6_addr_, &mcast_ss, mcast_sslen);
/* we want to join that LPD multicast group */
struct ipv6_mreq mcast_req = {};
mcast_req.ipv6mr_multiaddr = mcast6_addr_.sin6_addr;
mcast_req.ipv6mr_interface = 0; //FIXME
if (setsockopt(
sock,
IPPROTO_IPV6,
IPV6_JOIN_GROUP,
reinterpret_cast<char const*>(&mcast_req),
sizeof(mcast_req)) == -1)
{
return false;
}
/* configure outbound multicast TTL */
if (setsockopt(
sock,
IPPROTO_IPV6,
IPV6_MULTICAST_HOPS,
reinterpret_cast<char const*>(&AnnounceScope),
sizeof(AnnounceScope)) == -1)
{
return false;
}
if (setsockopt(
sock,
IPPROTO_IPV6,
IPV6_MULTICAST_IF,
reinterpret_cast<char const*>(&mcast_req.ipv6mr_interface),
sizeof(mcast_req.ipv6mr_interface)) == -1)
{
return false;
}
}
}
event_.reset(event_new(event_base, mcast_socket_, EV_READ | EV_PERSIST, event_callback, this));
event_add(event_.get(), nullptr);
events_[ip_protocol].reset(event_new(event_base, sock, EV_READ | EV_PERSIST, event_callback<ip_protocol>, this));
event_add(events_[ip_protocol].get(), nullptr);
tr_logAddDebug("Local Peer Discovery initialised");
tr_logAddDebug(fmt::format("{} Local Peer Discovery initialised", tr_ip_protocol_to_sv(ip_protocol)));
return true;
}
@ -354,27 +457,34 @@ private:
* @brief Processing of timeout notifications and incoming data on the socket
* @note maximum rate of read events is limited according to @a lpd_maxAnnounceCap
* @see DoS */
template<tr_address_type ip_protocol>
static void event_callback(evutil_socket_t /*s*/, short type, void* vself)
{
if ((type & EV_READ) != 0)
{
static_cast<tr_lpd_impl*>(vself)->onCanRead();
static_cast<tr_lpd_impl*>(vself)->onCanRead(ip_protocol);
}
}
void onCanRead()
void onCanRead(tr_address_type ip_protocol)
{
TR_ASSERT(tr_address::is_valid(ip_protocol));
if (!tr_address::is_valid(ip_protocol))
{
return;
}
if (!mediator_.allowsLPD())
{
return;
}
// process announcement from foreign peer
struct sockaddr_in foreign_addr = {};
struct sockaddr_storage foreign_addr = {};
auto addr_len = socklen_t{ sizeof(foreign_addr) };
auto foreign_msg = std::array<char, MaxDatagramLength>{};
auto const res = recvfrom(
mcast_socket_,
mcast_socket_[ip_protocol],
std::data(foreign_msg),
MaxDatagramLength,
0,
@ -386,6 +496,7 @@ private:
{
return;
}
TR_ASSERT(tr_af_to_ip_protocol(foreign_addr.ss_family) == ip_protocol);
// If it doesn't look like a BEP14 message, discard it
auto const msg = std::string_view{ std::data(foreign_msg), static_cast<size_t>(res) };
@ -410,10 +521,14 @@ private:
return;
}
auto [peer_addr, compact] = tr_address::from_compact_ipv4(reinterpret_cast<std::byte*>(&foreign_addr.sin_addr));
auto peer_sockaddr = tr_socket_address::from_sockaddr(reinterpret_cast<sockaddr*>(&foreign_addr));
if (!peer_sockaddr)
{
return;
}
for (auto const& hash_string : parsed->info_hash_strings)
{
if (!mediator_.onPeerFound(hash_string, peer_addr, parsed->port))
if (!mediator_.onPeerFound(hash_string, peer_sockaddr->address(), parsed->port))
{
tr_logAddDebug(fmt::format("Cannot serve torrent #{:s}", hash_string));
}
@ -462,19 +577,30 @@ private:
std::sort(std::begin(torrents), std::end(torrents), TorrentComparator);
// cram in as many as will fit in a message
auto const baseline_size = std::size(makeAnnounceMsg(cookie_, mediator_.port(), {}));
auto const size_with_one = std::size(makeAnnounceMsg(cookie_, mediator_.port(), { torrents.front().info_hash_str }));
auto const size_per_hash = size_with_one - baseline_size;
auto baseline_size = size_t{};
for (ipp_t ipp = 0; ipp < NUM_TR_AF_INET_TYPES; ++ipp)
{
baseline_size = std::max(
baseline_size,
std::size(makeAnnounceMsg(static_cast<tr_address_type>(ipp), cookie_, mediator_.port(), {})));
}
auto const size_per_hash = std::size(torrents.front().info_hash_str);
auto const max_torrents_per_announce = (MaxDatagramLength - baseline_size) / size_per_hash;
auto const torrents_this_announce = std::min(std::size(torrents), max_torrents_per_announce);
auto info_hash_strings = std::vector<std::string_view>{};
info_hash_strings.resize(std::min(std::size(torrents), max_torrents_per_announce));
info_hash_strings.reserve(torrents_this_announce);
std::transform(
std::begin(torrents),
std::begin(torrents) + std::size(info_hash_strings),
std::begin(info_hash_strings),
std::begin(torrents) + torrents_this_announce,
std::back_inserter(info_hash_strings),
[](auto const& tor) { return tor.info_hash_str; });
if (!sendAnnounce(info_hash_strings))
auto success = false;
for (ipp_t ipp = 0; ipp < NUM_TR_AF_INET_TYPES; ++ipp)
{
success |= sendAnnounce(static_cast<tr_address_type>(ipp), info_hash_strings);
}
if (!success)
{
return;
}
@ -508,28 +634,40 @@ private:
* matter). A listening client on the same network might react by adding us to his
* peer pool for torrent t.
*/
bool sendAnnounce(std::vector<std::string_view> const& info_hash_strings)
bool sendAnnounce(tr_address_type ip_protocol, std::vector<std::string_view> const& info_hash_strings)
{
auto const announce = makeAnnounceMsg(cookie_, mediator_.port(), info_hash_strings);
TR_ASSERT(tr_address::is_valid(ip_protocol));
if (!tr_address::is_valid(ip_protocol))
{
return false;
}
if (mcast_socket_[ip_protocol] == TR_BAD_SOCKET)
{
return true;
}
auto const announce = makeAnnounceMsg(ip_protocol, cookie_, mediator_.port(), info_hash_strings);
TR_ASSERT(std::size(announce) <= MaxDatagramLength);
auto const res = sendto(
mcast_socket_,
mcast_socket_[ip_protocol],
std::data(announce),
std::size(announce),
0,
reinterpret_cast<sockaddr const*>(&mcast_addr_),
sizeof(mcast_addr_));
auto const sent = res == static_cast<int>(std::size(announce));
return sent;
ip_protocol == TR_AF_INET ? reinterpret_cast<sockaddr const*>(&mcast_addr_) :
reinterpret_cast<sockaddr const*>(&mcast6_addr_),
ip_protocol == TR_AF_INET ? sizeof(mcast_addr_) : sizeof(mcast6_addr_));
return res == static_cast<int>(std::size(announce));
}
std::string const cookie_ = makeCookie();
Mediator& mediator_;
tr_socket_t mcast_socket_ = TR_BAD_SOCKET; /**multicast socket */
libtransmission::evhelpers::event_unique_ptr event_;
std::array<tr_socket_t, NUM_TR_AF_INET_TYPES> mcast_socket_ = { TR_BAD_SOCKET, TR_BAD_SOCKET }; /**multicast socket */
std::array<libtransmission::evhelpers::event_unique_ptr, NUM_TR_AF_INET_TYPES> events_;
static auto constexpr MaxDatagramLength = size_t{ 1400 };
sockaddr_in mcast_addr_ = {}; /**<initialized from the above constants in init() */
sockaddr_in6 mcast6_addr_ = {}; /**<initialized from the above constants in init() */
// BEP14: "To avoid causing multicast storms on large networks a
// client should send no more than 1 announce per minute."

View File

@ -41,9 +41,11 @@ public:
{
}
[[nodiscard]] tr_address bind_address(tr_address_type /* type */) const override
[[nodiscard]] tr_address bind_address(tr_address_type type) const override
{
return {};
auto ret = tr_address{};
ret.type = type;
return ret;
}
[[nodiscard]] tr_port port() const override