1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2025-01-03 05:25:52 +00:00

refactor: use one socket for sending and receiving lpd announces (#5909)

This commit is contained in:
Yat Ho 2023-08-19 04:56:28 +08:00 committed by GitHub
parent eea7d4d886
commit bb386cf17f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -6,6 +6,7 @@
#include <array>
#include <chrono>
#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
#include <optional>
@ -228,14 +229,9 @@ public:
{
event_.reset();
if (mcast_rcv_socket_ != TR_BAD_SOCKET)
if (mcast_socket_ != TR_BAD_SOCKET)
{
evutil_closesocket(mcast_rcv_socket_);
}
if (mcast_snd_socket_ != TR_BAD_SOCKET)
{
evutil_closesocket(mcast_snd_socket_);
evutil_closesocket(mcast_socket_);
}
tr_logAddTrace("Done uninitialising Local Peer Discovery");
@ -250,10 +246,8 @@ private:
}
auto const err = sockerrno;
evutil_closesocket(mcast_rcv_socket_);
evutil_closesocket(mcast_snd_socket_);
mcast_rcv_socket_ = TR_BAD_SOCKET;
mcast_snd_socket_ = TR_BAD_SOCKET;
evutil_closesocket(mcast_socket_);
mcast_socket_ = TR_BAD_SOCKET;
tr_logAddWarn(fmt::format(
_("Couldn't initialize LPD: {error} ({error_code})"),
fmt::arg("error", tr_strerror(err)),
@ -272,127 +266,80 @@ private:
*/
bool initImpl(struct event_base* event_base)
{
int const opt_on = 1;
auto const opt_on = int{ 1 };
static_assert(AnnounceScope > 0);
tr_logAddDebug("Initialising Local Peer Discovery");
/* setup datagram socket (receive) */
/* setup datagram socket */
{
mcast_rcv_socket_ = socket(PF_INET, SOCK_DGRAM, 0);
mcast_socket_ = socket(PF_INET, SOCK_DGRAM, 0);
if (mcast_rcv_socket_ == TR_BAD_SOCKET)
if (mcast_socket_ == TR_BAD_SOCKET)
{
return false;
}
if (evutil_make_socket_nonblocking(mcast_rcv_socket_) == -1)
if (evutil_make_socket_nonblocking(mcast_socket_) == -1)
{
return false;
}
if (setsockopt(
mcast_rcv_socket_,
SOL_SOCKET,
SO_REUSEADDR,
reinterpret_cast<char const*>(&opt_on),
sizeof(opt_on)) == -1)
if (setsockopt(mcast_socket_, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char const*>(&opt_on), sizeof(opt_on)) ==
-1)
{
return false;
}
#if HAVE_SO_REUSEPORT
if (setsockopt(
mcast_rcv_socket_,
SOL_SOCKET,
SO_REUSEPORT,
reinterpret_cast<char const*>(&opt_on),
sizeof(opt_on)) == -1)
if (setsockopt(mcast_socket_, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast<char const*>(&opt_on), sizeof(opt_on)) ==
-1)
{
return false;
}
#endif
mcast_addr_ = {};
mcast_addr_.sin_family = AF_INET;
mcast_addr_.sin_port = McastPort.network();
mcast_addr_.sin_addr.s_addr = INADDR_ANY;
auto const [bind_ss, bind_sslen] = tr_socket_address::to_sockaddr(mediator_.bind_address(TR_AF_INET), McastPort);
if (bind(mcast_rcv_socket_, reinterpret_cast<sockaddr*>(&mcast_addr_), sizeof(mcast_addr_)) == -1)
if (bind(mcast_socket_, reinterpret_cast<sockaddr const*>(&bind_ss), bind_sslen) == -1)
{
return false;
}
if (evutil_inet_pton(mcast_addr_.sin_family, McastGroup, &mcast_addr_.sin_addr) == -1)
{
return false;
}
auto const mcast_addr = tr_address::from_string(McastGroup);
TR_ASSERT(mcast_addr);
auto const [mcast_ss, mcast_sslen] = tr_socket_address::to_sockaddr(*mcast_addr, McastPort);
std::memcpy(&mcast_addr_, &mcast_ss, mcast_sslen);
/* we want to join that LPD multicast group */
struct ip_mreq mcast_req = {};
ip_mreq mcast_req = {};
mcast_req.imr_multiaddr = mcast_addr_.sin_addr;
mcast_req.imr_interface.s_addr = INADDR_ANY;
mcast_req.imr_interface = reinterpret_cast<sockaddr_in const*>(&bind_ss)->sin_addr;
if (setsockopt(
mcast_rcv_socket_,
mcast_socket_,
IPPROTO_IP,
IP_ADD_MEMBERSHIP,
reinterpret_cast<char const*>(&mcast_req),
sizeof(struct ip_mreq)) == -1)
{
return false;
}
}
/* setup datagram socket (send) */
{
unsigned char const scope = AnnounceScope;
mcast_snd_socket_ = socket(PF_INET, SOCK_DGRAM, 0);
if (mcast_snd_socket_ == TR_BAD_SOCKET)
{
return false;
}
if (evutil_make_socket_nonblocking(mcast_snd_socket_) == -1)
{
return false;
}
if (setsockopt(
mcast_snd_socket_,
SOL_SOCKET,
SO_REUSEADDR,
reinterpret_cast<char const*>(&opt_on),
sizeof(opt_on)) == -1)
{
return false;
}
if (auto [ss, sslen] = tr_socket_address::to_sockaddr(mediator_.bind_address(TR_AF_INET), {});
bind(mcast_snd_socket_, reinterpret_cast<sockaddr*>(&ss), sslen) == -1)
sizeof(mcast_req)) == -1)
{
return false;
}
/* configure outbound multicast TTL */
if (setsockopt(
mcast_snd_socket_,
mcast_socket_,
IPPROTO_IP,
IP_MULTICAST_TTL,
reinterpret_cast<char const*>(&scope),
sizeof(scope)) == -1)
reinterpret_cast<char const*>(&AnnounceScope),
sizeof(AnnounceScope)) == -1)
{
return false;
}
}
/* Note: lpd_unsolicitedMsgCounter remains 0 until the first timeout event, thus
* any announcement received during the initial interval will be discarded. */
event_.reset(event_new(event_base, mcast_rcv_socket_, EV_READ | EV_PERSIST, event_callback, this));
event_.reset(event_new(event_base, mcast_socket_, EV_READ | EV_PERSIST, event_callback, this));
event_add(event_.get(), nullptr);
tr_logAddDebug("Local Peer Discovery initialised");
@ -424,15 +371,15 @@ private:
auto addr_len = socklen_t{ sizeof(foreign_addr) };
auto foreign_msg = std::array<char, MaxDatagramLength>{};
auto const res = recvfrom(
mcast_rcv_socket_,
mcast_socket_,
std::data(foreign_msg),
MaxDatagramLength,
0,
reinterpret_cast<sockaddr*>(&foreign_addr),
&addr_len);
// If we couldn't read it or it was too big, discard it
if (res < 1 || static_cast<size_t>(res) > MaxDatagramLength)
// If we couldn't read it, discard it
if (res < 1)
{
return;
}
@ -465,7 +412,7 @@ private:
{
if (!mediator_.onPeerFound(hash_string, peer_addr, parsed->port))
{
tr_logAddDebug(fmt::format(FMT_STRING("Cannot serve torrent #{:s}"), hash_string));
tr_logAddDebug(fmt::format("Cannot serve torrent #{:s}", hash_string));
}
}
}
@ -484,7 +431,7 @@ private:
auto const needs_announce = [&now](auto& info)
{
return info.allows_lpd && (info.activity == TR_STATUS_DOWNLOAD || info.activity == TR_STATUS_SEED) &&
(info.announce_after < now);
info.announce_after < now;
};
torrents.erase(
std::remove_if(std::begin(torrents), std::end(torrents), std::not_fn(needs_announce)),
@ -496,22 +443,20 @@ private:
}
// prioritize the remaining torrents
std::sort(
std::begin(torrents),
std::end(torrents),
[](auto const& a, auto const& b)
static auto constexpr TorrentComparator = [](auto const& a, auto const& b)
{
if (a.activity != b.activity)
{
if (a.activity != b.activity)
{
return a.activity < b.activity;
}
return a.activity < b.activity;
}
if (a.announce_after != b.announce_after)
{
return a.announce_after < b.announce_after;
}
return false;
});
if (a.announce_after != b.announce_after)
{
return a.announce_after < b.announce_after;
}
return false;
};
std::sort(std::begin(torrents), std::end(torrents), TorrentComparator);
// cram in as many as will fit in a message
auto const baseline_size = std::size(makeAnnounceMsg(cookie_, mediator_.port(), {}));
@ -565,7 +510,7 @@ private:
auto const announce = makeAnnounceMsg(cookie_, mediator_.port(), info_hash_strings);
TR_ASSERT(std::size(announce) <= MaxDatagramLength);
auto const res = sendto(
mcast_snd_socket_,
mcast_socket_,
std::data(announce),
std::size(announce),
0,
@ -577,8 +522,7 @@ private:
std::string const cookie_ = makeCookie();
Mediator& mediator_;
tr_socket_t mcast_rcv_socket_ = TR_BAD_SOCKET; /**<separate multicast receive socket */
tr_socket_t mcast_snd_socket_ = TR_BAD_SOCKET; /**<and multicast send socket */
tr_socket_t mcast_socket_ = TR_BAD_SOCKET; /**multicast socket */
libtransmission::evhelpers::event_unique_ptr event_;
static auto constexpr MaxDatagramLength = size_t{ 1400 };