// Except where noted, This file Copyright © Johannes Lieder. // It may be used under the MIT (SPDX: MIT) license. // License text can be found in the licenses/ folder. #include #include #include #include // std::byte #include // uint16_t #include #include // time_t #include #include #include #include #include #include #ifdef _WIN32 #include #else #include /* socket(), bind() */ #include /* sockaddr_in */ #endif #include #include #include "libtransmission/transmission.h" #include "libtransmission/crypto-utils.h" // for tr_rand_obj() #include "libtransmission/log.h" #include "libtransmission/net.h" #include "libtransmission/timer.h" #include "libtransmission/tr-assert.h" #include "libtransmission/tr-lpd.h" #include "libtransmission/utils.h" // for tr_net_init() #include "libtransmission/utils-ev.h" // for tr_net_init() using namespace std::literals; // Code in this namespace Copyright © Mnemosyne LLC. // It may be used under GPLv2 (SPDX: GPL-2.0-only), GPLv3 (SPDX: GPL-3.0-only), MIT (SPDX: MIT), // or any future license endorsed by Mnemosyne LLC. // License text can be found in the licenses/ folder. namespace { // opaque value, allowing the sending client to filter out its // own announces if it receives them via multicast loopback auto makeCookie() { static auto constexpr Pool = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"sv; auto buf = tr_rand_obj>(); for (auto& ch : buf) { ch = Pool[static_cast(ch) % std::size(Pool)]; } return std::string{ std::data(buf), std::size(buf) }; } constexpr char const* const McastGroup = "239.192.152.143"; /**\r\n * Port: \r\n * Infohash: \r\n * cookie: \r\n * \r\n * \r\n * ``` * * An announce may contain multiple, consecutive Infohash headers * to announce the participation in more than one torrent. This * may not be supported by older implementations. When sending * multiple infohashes the packet length should not exceed 1400 * bytes to avoid MTU/fragmentation problems. */ auto makeAnnounceMsg(std::string_view cookie, tr_port port, std::vector const& info_hash_strings) { auto ret = fmt::format( "BT-SEARCH * HTTP/1.1\r\n" "Host: {:s}:{:d}\r\n" "Port: {:d}\r\n", McastGroup, McastPort.host(), port.host()); for (auto const& info_hash : info_hash_strings) { ret += fmt::format("Infohash: {:s}\r\n", tr_strupper(info_hash)); } if (!std::empty(cookie)) { ret += fmt::format("cookie: {:s}\r\n", cookie); } return ret + "\r\n\r\n"; } struct ParsedAnnounce { int major; int minor; tr_port port; std::vector info_hash_strings; std::string_view cookie; }; std::optional parseAnnounceMsg(std::string_view announce) { static auto constexpr CrLf = "\r\n"sv; auto ret = ParsedAnnounce{}; // get major, minor auto key = "BT-SEARCH * HTTP/"sv; if (auto const pos = announce.find(key); pos != std::string_view::npos) { // parse `${major}.${minor}` auto walk = announce.substr(pos + std::size(key)); if (auto const major = tr_num_parse(walk, &walk); major && tr_strv_starts_with(walk, '.')) { ret.major = *major; } else { return {}; } walk.remove_prefix(1); // the '.' between major and minor if (auto const minor = tr_num_parse(walk, &walk); minor && tr_strv_starts_with(walk, CrLf)) { ret.minor = *minor; } else { return {}; } } key = "Port: "sv; if (auto const pos = announce.find(key); pos != std::string_view::npos) { auto walk = announce.substr(pos + std::size(key)); if (auto const port = tr_num_parse(walk, &walk); port && tr_strv_starts_with(walk, CrLf)) { ret.port = tr_port::from_host(*port); } else { return {}; } } key = "cookie: "sv; if (auto const pos = announce.find(key); pos != std::string_view::npos) { auto walk = announce.substr(pos + std::size(key)); if (auto const end = walk.find(CrLf); end != std::string_view::npos) { ret.cookie = walk.substr(0, end); } else { return {}; } } key = "Infohash: "sv; for (;;) { if (auto const pos = announce.find(key); pos != std::string_view::npos) { announce.remove_prefix(pos + std::size(key)); } else { break; } if (auto const end = announce.find(CrLf); end != std::string_view::npos) { ret.info_hash_strings.push_back(announce.substr(0, end)); announce.remove_prefix(end + std::size(CrLf)); } else { return {}; } } return ret; } } // namespace class tr_lpd_impl final : public tr_lpd { public: tr_lpd_impl(Mediator& mediator, struct event_base* event_base) : mediator_{ mediator } , announce_timer_{ mediator.timerMaker().create([this]() { announceUpkeep(); }) } , dos_timer_{ mediator.timerMaker().create([this]() { dosUpkeep(); }) } { if (!init(event_base)) { return; } announce_timer_->start_repeating(AnnounceInterval); announceUpkeep(); dos_timer_->start_repeating(DosInterval); dosUpkeep(); } tr_lpd_impl(tr_lpd_impl&&) = delete; tr_lpd_impl(tr_lpd_impl const&) = delete; tr_lpd_impl& operator=(tr_lpd_impl&&) = delete; tr_lpd_impl& operator=(tr_lpd_impl const&) = delete; ~tr_lpd_impl() override { event_.reset(); if (mcast_socket_ != TR_BAD_SOCKET) { tr_net_close_socket(mcast_socket_); } tr_logAddTrace("Done uninitialising Local Peer Discovery"); } private: bool init(struct event_base* event_base) { if (initImpl(event_base)) { return true; } auto const err = sockerrno; tr_net_close_socket(mcast_socket_); mcast_socket_ = TR_BAD_SOCKET; tr_logAddWarn(fmt::format( _("Couldn't initialize LPD: {error} ({error_code})"), fmt::arg("error", tr_strerror(err)), fmt::arg("error_code", err))); return false; } /** * @brief Initializes Local Peer Discovery for this node * * For the most part, this means setting up an appropriately configured multicast socket * and event-based message handling. * * @remark Since the LPD service does not use another protocol family yet, this code is * IPv4 only for the time being. */ bool initImpl(struct event_base* event_base) { auto const opt_on = 1; static_assert(AnnounceScope > 0); tr_logAddDebug("Initialising Local Peer Discovery"); /* setup datagram socket */ { mcast_socket_ = socket(PF_INET, SOCK_DGRAM, 0); if (mcast_socket_ == TR_BAD_SOCKET) { return false; } if (evutil_make_socket_nonblocking(mcast_socket_) == -1) { return false; } if (setsockopt(mcast_socket_, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&opt_on), sizeof(opt_on)) == -1) { return false; } #if HAVE_SO_REUSEPORT if (setsockopt(mcast_socket_, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast(&opt_on), sizeof(opt_on)) == -1) { return false; } #endif auto const [bind_ss, bind_sslen] = tr_socket_address::to_sockaddr(mediator_.bind_address(TR_AF_INET), McastPort); if (bind(mcast_socket_, reinterpret_cast(&bind_ss), bind_sslen) == -1) { return false; } auto const mcast_addr = tr_address::from_string(McastGroup); TR_ASSERT(mcast_addr); auto const [mcast_ss, mcast_sslen] = tr_socket_address::to_sockaddr(*mcast_addr, McastPort); std::memcpy(&mcast_addr_, &mcast_ss, mcast_sslen); /* we want to join that LPD multicast group */ ip_mreq mcast_req = {}; mcast_req.imr_multiaddr = mcast_addr_.sin_addr; mcast_req.imr_interface = reinterpret_cast(&bind_ss)->sin_addr; if (setsockopt( mcast_socket_, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast(&mcast_req), sizeof(mcast_req)) == -1) { return false; } /* configure outbound multicast TTL */ if (setsockopt( mcast_socket_, IPPROTO_IP, IP_MULTICAST_TTL, reinterpret_cast(&AnnounceScope), sizeof(AnnounceScope)) == -1) { return false; } } event_.reset(event_new(event_base, mcast_socket_, EV_READ | EV_PERSIST, event_callback, this)); event_add(event_.get(), nullptr); tr_logAddDebug("Local Peer Discovery initialised"); return true; } /** * @brief Processing of timeout notifications and incoming data on the socket * @note maximum rate of read events is limited according to @a lpd_maxAnnounceCap * @see DoS */ static void event_callback(evutil_socket_t /*s*/, short type, void* vself) { if ((type & EV_READ) != 0) { static_cast(vself)->onCanRead(); } } void onCanRead() { if (!mediator_.allowsLPD()) { return; } // process announcement from foreign peer struct sockaddr_in foreign_addr = {}; auto addr_len = socklen_t{ sizeof(foreign_addr) }; auto foreign_msg = std::array{}; auto const res = recvfrom( mcast_socket_, std::data(foreign_msg), MaxDatagramLength, 0, reinterpret_cast(&foreign_addr), &addr_len); // If we couldn't read it, discard it if (res < 1) { return; } // If it doesn't look like a BEP14 message, discard it auto const msg = std::string_view{ std::data(foreign_msg), static_cast(res) }; if (static auto constexpr SearchKey = "BT-SEARCH * HTTP/"sv; msg.find(SearchKey) == std::string_view::npos) { return; } // If we're receiving too many, discard it if (++messages_received_since_upkeep_ > MaxIncomingPerUpkeep) { return; } // If it's an invalid message or the wrong protocol version, discard it. // Note this comes *after* incrementing the count since there is some // small CPU overhead in parsing, so don't do it for *every* message auto const parsed = parseAnnounceMsg(msg); if (!parsed || parsed->major != 1 || parsed->minor < 1 || parsed->cookie == cookie_) { tr_logAddTrace("Discarded invalid multicast message"); return; } auto [peer_addr, compact] = tr_address::from_compact_ipv4(reinterpret_cast(&foreign_addr.sin_addr)); for (auto const& hash_string : parsed->info_hash_strings) { if (!mediator_.onPeerFound(hash_string, peer_addr, parsed->port)) { tr_logAddDebug(fmt::format("Cannot serve torrent #{:s}", hash_string)); } } } void announceUpkeep() { if (!mediator_.allowsLPD()) { return; } auto torrents = mediator_.torrents(); // remove torrents that don't need to be announced auto const now = tr_time(); auto const needs_announce = [&now](auto& info) { return info.allows_lpd && (info.activity == TR_STATUS_DOWNLOAD || info.activity == TR_STATUS_SEED) && info.announce_after < now; }; torrents.erase( std::remove_if(std::begin(torrents), std::end(torrents), std::not_fn(needs_announce)), std::end(torrents)); if (std::empty(torrents)) { return; } // prioritize the remaining torrents static auto constexpr TorrentComparator = [](auto const& a, auto const& b) { if (a.activity != b.activity) { return a.activity < b.activity; } if (a.announce_after != b.announce_after) { return a.announce_after < b.announce_after; } return false; }; std::sort(std::begin(torrents), std::end(torrents), TorrentComparator); // cram in as many as will fit in a message auto const baseline_size = std::size(makeAnnounceMsg(cookie_, mediator_.port(), {})); auto const size_with_one = std::size(makeAnnounceMsg(cookie_, mediator_.port(), { torrents.front().info_hash_str })); auto const size_per_hash = size_with_one - baseline_size; auto const max_torrents_per_announce = (MaxDatagramLength - baseline_size) / size_per_hash; auto info_hash_strings = std::vector{}; info_hash_strings.resize(std::min(std::size(torrents), max_torrents_per_announce)); std::transform( std::begin(torrents), std::begin(torrents) + std::size(info_hash_strings), std::begin(info_hash_strings), [](auto const& tor) { return tor.info_hash_str; }); if (!sendAnnounce(info_hash_strings)) { return; } auto const next_announce_after = now + TorrentAnnounceIntervalSec; for (auto const& info_hash_string : info_hash_strings) { mediator_.setNextAnnounceTime(info_hash_string, next_announce_after); } } void dosUpkeep() { if (messages_received_since_upkeep_ > MaxIncomingPerUpkeep) { tr_logAddTrace(fmt::format( "Dropped {} announces in the last interval (max. {} allowed)", messages_received_since_upkeep_ - MaxIncomingPerUpkeep, MaxIncomingPerUpkeep)); } messages_received_since_upkeep_ = 0; } /** * @brief Announce the given torrent on the local network * * @return Returns a success flag * * Send a query for torrent t out to the LPD multicast group (or the LAN, for that * matter). A listening client on the same network might react by adding us to his * peer pool for torrent t. */ bool sendAnnounce(std::vector const& info_hash_strings) { auto const announce = makeAnnounceMsg(cookie_, mediator_.port(), info_hash_strings); TR_ASSERT(std::size(announce) <= MaxDatagramLength); auto const res = sendto( mcast_socket_, std::data(announce), std::size(announce), 0, reinterpret_cast(&mcast_addr_), sizeof(mcast_addr_)); auto const sent = res == static_cast(std::size(announce)); return sent; } std::string const cookie_ = makeCookie(); Mediator& mediator_; tr_socket_t mcast_socket_ = TR_BAD_SOCKET; /**multicast socket */ libtransmission::evhelpers::event_unique_ptr event_; static auto constexpr MaxDatagramLength = size_t{ 1400 }; sockaddr_in mcast_addr_ = {}; /** announce_timer_; // Flood Protection: // To protect against message flooding, stop processing search messages // after processing N per upkeep. If we hit that limit, we're either // in a *very* crowded multicast group or a hostile host is sending us // bogus data. Better to drop a few packets than get DoS'ed. static auto constexpr DosInterval = 5s; std::unique_ptr dos_timer_; static auto constexpr MaxIncomingPerSecond = 10; static auto constexpr MaxIncomingPerUpkeep = std::chrono::duration_cast(DosInterval).count() * MaxIncomingPerSecond; // @brief throw away messages after this number exceeds MaxIncomingPerUpkeep size_t messages_received_since_upkeep_ = 0U; static auto constexpr TorrentAnnounceIntervalSec = time_t{ 240U }; // how frequently to reannounce the same torrent static auto constexpr TtlSameSubnet = 1; static auto constexpr AnnounceScope = int{ TtlSameSubnet }; /** tr_lpd::create(Mediator& mediator, struct event_base* event_base) { return std::make_unique(mediator, event_base); }