refactor: incremental announcer refactor pt. 2 (#4214)

This commit is contained in:
Charles Kerr 2022-11-18 23:00:25 -06:00 committed by GitHub
parent 62cad53df8
commit d27c4c59ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 393 additions and 481 deletions

View File

@ -11,6 +11,7 @@
#include <array>
#include <cstdint> // uint64_t
#include <functional>
#include <optional>
#include <string>
#include <string_view>
@ -25,17 +26,9 @@
struct tr_url_parsed_t;
void tr_tracker_http_scrape(
tr_session const* session,
tr_scrape_request const* req,
tr_scrape_response_func response_func,
void* user_data);
void tr_tracker_http_scrape(tr_session const* session, tr_scrape_request const& req, tr_scrape_response_func on_response);
void tr_tracker_http_announce(
tr_session const* session,
tr_announce_request const* req,
tr_announce_response_func response_func,
void* user_data);
void tr_tracker_http_announce(tr_session const* session, tr_announce_request const& req, tr_announce_response_func on_response);
void tr_announcerParseHttpAnnounceResponse(tr_announce_response& response, std::string_view benc, std::string_view log_name);

View File

@ -46,19 +46,18 @@ using namespace std::literals;
*****
****/
static std::string_view get_event_string(tr_announce_request const* req)
[[nodiscard]] static constexpr std::string_view get_event_string(tr_announce_request const& req)
{
return req->partial_seed && (req->event != TR_ANNOUNCE_EVENT_STOPPED) ? "paused"sv :
tr_announce_event_get_string(req->event);
return req.partial_seed && (req.event != TR_ANNOUNCE_EVENT_STOPPED) ? "paused"sv : tr_announce_event_get_string(req.event);
}
static void announce_url_new(tr_urlbuf& url, tr_session const* session, tr_announce_request const* req)
static void announce_url_new(tr_urlbuf& url, tr_session const* session, tr_announce_request const& req)
{
url.clear();
auto out = std::back_inserter(url);
auto escaped_info_hash = tr_urlbuf{};
tr_urlPercentEncode(std::back_inserter(escaped_info_hash), req->info_hash);
tr_urlPercentEncode(std::back_inserter(escaped_info_hash), req.info_hash);
fmt::format_to(
out,
@ -73,25 +72,25 @@ static void announce_url_new(tr_urlbuf& url, tr_session const* session, tr_annou
"&key={key}"
"&compact=1"
"&supportcrypto=1",
fmt::arg("url", req->announce_url),
fmt::arg("sep", tr_strvContains(req->announce_url.sv(), '?') ? '&' : '?'),
fmt::arg("url", req.announce_url),
fmt::arg("sep", tr_strvContains(req.announce_url.sv(), '?') ? '&' : '?'),
fmt::arg("info_hash", std::data(escaped_info_hash)),
fmt::arg("peer_id", std::string_view{ std::data(req->peer_id), std::size(req->peer_id) }),
fmt::arg("port", req->port.host()),
fmt::arg("uploaded", req->up),
fmt::arg("downloaded", req->down),
fmt::arg("left", req->leftUntilComplete),
fmt::arg("numwant", req->numwant),
fmt::arg("key", req->key));
fmt::arg("peer_id", std::string_view{ std::data(req.peer_id), std::size(req.peer_id) }),
fmt::arg("port", req.port.host()),
fmt::arg("uploaded", req.up),
fmt::arg("downloaded", req.down),
fmt::arg("left", req.leftUntilComplete),
fmt::arg("numwant", req.numwant),
fmt::arg("key", req.key));
if (session->encryptionMode() == TR_ENCRYPTION_REQUIRED)
{
fmt::format_to(out, "&requirecrypto=1");
}
if (req->corrupt != 0)
if (req.corrupt != 0)
{
fmt::format_to(out, "&corrupt={}", req->corrupt);
fmt::format_to(out, "&corrupt={}", req.corrupt);
}
if (auto const str = get_event_string(req); !std::empty(str))
@ -99,9 +98,9 @@ static void announce_url_new(tr_urlbuf& url, tr_session const* session, tr_annou
fmt::format_to(out, "&event={}", str);
}
if (!std::empty(req->tracker_id))
if (!std::empty(req.tracker_id))
{
fmt::format_to(out, "&trackerid={}", req->tracker_id);
fmt::format_to(out, "&trackerid={}", req.tracker_id);
}
}
@ -298,11 +297,17 @@ void tr_announcerParseHttpAnnounceResponse(tr_announce_response& response, std::
struct http_announce_data
{
http_announce_data(tr_sha1_digest_t info_hash_in, tr_announce_response_func on_response_in, std::string_view log_name_in)
: info_hash{ info_hash_in }
, on_response{ std::move(on_response_in) }
, log_name{ log_name_in }
{
}
tr_sha1_digest_t info_hash = {};
std::optional<tr_announce_response> previous_response;
tr_announce_response_func response_func = nullptr;
void* response_func_user_data = nullptr;
tr_announce_response_func on_response;
bool http_success = false;
uint8_t requests_sent_count = {};
@ -352,7 +357,7 @@ static void onAnnounceDone(tr_web::FetchResponse const& web_response)
// If another request already succeeded (or we don't have a registered callback),
// skip processing this response:
if (!data->http_success && data->response_func != nullptr)
if (!data->http_success && data->on_response)
{
tr_announce_response response;
response.info_hash = data->info_hash;
@ -361,7 +366,7 @@ static void onAnnounceDone(tr_web::FetchResponse const& web_response)
if (data->http_success)
{
data->response_func(&response, data->response_func_user_data);
data->on_response(response);
}
else if (data->requests_answered_count == data->requests_sent_count)
{
@ -374,7 +379,7 @@ static void onAnnounceDone(tr_web::FetchResponse const& web_response)
response_used = &*data->previous_response;
}
data->response_func(response_used, data->response_func_user_data);
data->on_response(*response_used);
}
else
{
@ -399,15 +404,10 @@ static void onAnnounceDone(tr_web::FetchResponse const& web_response)
void tr_tracker_http_announce(
tr_session const* session,
tr_announce_request const* request,
tr_announce_response_func response_func,
void* response_func_user_data)
tr_announce_request const& request,
tr_announce_response_func on_response)
{
auto* const d = new http_announce_data();
d->response_func = response_func;
d->response_func_user_data = response_func_user_data;
d->info_hash = request->info_hash;
d->log_name = request->log_name;
auto* const d = new http_announce_data{ request.info_hash, std::move(on_response), request.log_name };
/* There are two alternative techniques for announcing both IPv4 and
IPv6 addresses. Previous version of BEP-7 suggests adding "ipv4="
@ -431,7 +431,7 @@ void tr_tracker_http_announce(
auto do_make_request = [&](std::string_view const& protocol_name, tr_web::FetchOptions&& opt)
{
tr_logAddTrace(fmt::format("Sending {} announce to libcurl: '{}'", protocol_name, opt.url), request->log_name);
tr_logAddTrace(fmt::format("Sending {} announce to libcurl: '{}'", protocol_name, opt.url), request.log_name);
session->fetch(std::move(opt));
};
@ -606,25 +606,50 @@ void tr_announcerParseHttpScrapeResponse(tr_scrape_response& response, std::stri
}
}
struct scrape_data
class scrape_data
{
tr_scrape_response response = {};
tr_scrape_response_func response_func = nullptr;
void* response_func_user_data = nullptr;
std::string log_name;
public:
scrape_data(tr_scrape_response_func response_func, std::string_view log_name)
: response_func_{ std::move(response_func) }
, log_name_{ log_name }
{
}
[[nodiscard]] constexpr auto& response() noexcept
{
return response_;
}
[[nodiscard]] constexpr auto const& log_name() const noexcept
{
return log_name_;
}
void invoke_callback()
{
if (response_func_)
{
response_func_(response_);
}
}
private:
tr_scrape_response response_ = {};
tr_scrape_response_func response_func_ = {};
std::string log_name_;
};
static void onScrapeDone(tr_web::FetchResponse const& web_response)
{
auto const& [status, body, did_connect, did_timeout, vdata] = web_response;
auto* const data = static_cast<struct scrape_data*>(vdata);
auto* const data = static_cast<scrape_data*>(vdata);
tr_scrape_response& response = data->response;
auto& response = data->response();
response.did_connect = did_connect;
response.did_timeout = did_timeout;
auto const scrape_url_sv = response.scrape_url.sv();
tr_logAddTrace(fmt::format("Got scrape response for '{}'", scrape_url_sv), data->log_name);
tr_logAddTrace(fmt::format("Got scrape response for '{}'", scrape_url_sv), data->log_name());
if (status != HTTP_OK)
{
@ -633,55 +658,44 @@ static void onScrapeDone(tr_web::FetchResponse const& web_response)
}
else if (!std::empty(body))
{
tr_announcerParseHttpScrapeResponse(response, body, data->log_name);
}
if (data->response_func != nullptr)
{
data->response_func(&data->response, data->response_func_user_data);
tr_announcerParseHttpScrapeResponse(response, body, data->log_name());
}
data->invoke_callback();
delete data;
}
static void scrape_url_new(tr_pathbuf& scrape_url, tr_scrape_request const* req)
static void scrape_url_new(tr_pathbuf& scrape_url, tr_scrape_request const& req)
{
scrape_url = req->scrape_url.sv();
scrape_url = req.scrape_url.sv();
char delimiter = tr_strvContains(scrape_url, '?') ? '&' : '?';
for (int i = 0; i < req->info_hash_count; ++i)
for (int i = 0; i < req.info_hash_count; ++i)
{
scrape_url.append(delimiter, "info_hash=");
tr_urlPercentEncode(std::back_inserter(scrape_url), req->info_hash[i]);
tr_urlPercentEncode(std::back_inserter(scrape_url), req.info_hash[i]);
delimiter = '&';
}
}
void tr_tracker_http_scrape(
tr_session const* session,
tr_scrape_request const* request,
tr_scrape_response_func response_func,
void* response_func_user_data)
void tr_tracker_http_scrape(tr_session const* session, tr_scrape_request const& request, tr_scrape_response_func on_response)
{
auto* d = new scrape_data();
d->response.scrape_url = request->scrape_url;
d->response_func = response_func;
d->response_func_user_data = response_func_user_data;
d->response.row_count = request->info_hash_count;
auto* d = new scrape_data{ std::move(on_response), request.log_name };
for (int i = 0; i < d->response.row_count; ++i)
auto& response = d->response();
response.scrape_url = request.scrape_url;
response.row_count = request.info_hash_count;
for (int i = 0; i < response.row_count; ++i)
{
d->response.rows[i].info_hash = request->info_hash[i];
d->response.rows[i].seeders = -1;
d->response.rows[i].leechers = -1;
d->response.rows[i].downloads = -1;
response.rows[i].info_hash = request.info_hash[i];
response.rows[i].seeders = -1;
response.rows[i].leechers = -1;
response.rows[i].downloads = -1;
}
d->log_name = request->log_name;
auto scrape_url = tr_pathbuf{};
scrape_url_new(scrape_url, request);
tr_logAddTrace(fmt::format("Sending scrape to libcurl: '{}'", scrape_url), request->log_name);
tr_logAddTrace(fmt::format("Sending scrape to libcurl: '{}'", scrape_url), request.log_name);
auto options = tr_web::FetchOptions{ scrape_url, onScrapeDone, d };
options.timeout_secs = 30L;
options.sndbuf = 4096;

View File

@ -72,9 +72,8 @@ enum tau_action_t
struct tau_scrape_request
{
tau_scrape_request(tr_scrape_request const& in, tr_scrape_response_func callback, void* user_data)
: callback_{ callback }
, user_data_{ user_data }
tau_scrape_request(tr_scrape_request const& in, tr_scrape_response_func on_response)
: on_response_{ std::move(on_response) }
{
this->response.scrape_url = in.scrape_url;
this->response.row_count = in.info_hash_count;
@ -97,16 +96,16 @@ struct tau_scrape_request
this->payload.insert(std::end(this->payload), std::begin(buf), std::end(buf));
}
[[nodiscard]] constexpr auto hasCallback() const noexcept
[[nodiscard]] auto has_callback() const noexcept
{
return callback_ != nullptr;
return !!on_response_;
}
void requestFinished()
{
if (callback_ != nullptr)
if (on_response_)
{
callback_(&response, user_data_);
on_response_(response);
}
}
@ -156,8 +155,7 @@ struct tau_scrape_request
tr_scrape_response response = {};
private:
tr_scrape_response_func const callback_;
void* const user_data_;
tr_scrape_response_func on_response_;
};
/****
@ -166,13 +164,8 @@ private:
struct tau_announce_request
{
tau_announce_request(
uint32_t announce_ip,
tr_announce_request const& in,
tr_announce_response_func callback,
void* user_data)
: callback_{ callback }
, user_data_{ user_data }
tau_announce_request(uint32_t announce_ip, tr_announce_request const& in, tr_announce_response_func on_response)
: on_response_{ std::move(on_response) }
{
response.seeders = -1;
response.leechers = -1;
@ -196,16 +189,16 @@ struct tau_announce_request
payload.insert(std::end(payload), std::begin(buf), std::end(buf));
}
[[nodiscard]] constexpr auto hasCallback() const noexcept
[[nodiscard]] auto has_callback() const noexcept
{
return callback_ != nullptr;
return !!on_response_;
}
void requestFinished()
{
if (callback_ != nullptr)
if (on_response_)
{
callback_(&this->response, user_data_);
on_response_(this->response);
}
}
@ -278,8 +271,7 @@ private:
}
}
tr_announce_response_func const callback_;
void* const user_data_;
tr_announce_response_func on_response_;
};
/****
@ -524,7 +516,7 @@ private:
req.sent_at = now;
send_request(std::data(req.payload), std::size(req.payload));
if (req.hasCallback())
if (req.has_callback())
{
++it;
continue;
@ -586,7 +578,7 @@ public:
{
}
void announce(tr_announce_request const& request, tr_announce_response_func response_func, void* user_data) override
void announce(tr_announce_request const& request, tr_announce_response_func on_response) override
{
auto* const tracker = getTrackerFromUrl(request.announce_url);
if (tracker == nullptr)
@ -597,11 +589,11 @@ public:
// Since size of IP field is only 4 bytes long, we can only announce IPv4 addresses
auto const addr = mediator_.announceIP();
uint32_t const announce_ip = addr && addr->isIPv4() ? addr->addr.addr4.s_addr : 0;
tracker->announces.emplace_back(announce_ip, request, response_func, user_data);
tracker->announces.emplace_back(announce_ip, request, std::move(on_response));
tracker->upkeep(false);
}
void scrape(tr_scrape_request const& request, tr_scrape_response_func response_func, void* user_data) override
void scrape(tr_scrape_request const& request, tr_scrape_response_func on_response) override
{
auto* const tracker = getTrackerFromUrl(request.scrape_url);
if (tracker == nullptr)
@ -609,7 +601,7 @@ public:
return;
}
tracker->scrapes.emplace_back(request, response_func, user_data);
tracker->scrapes.emplace_back(request, std::move(on_response));
tracker->upkeep(false);
}

View File

@ -58,9 +58,6 @@ static auto constexpr UpkeepInterval = 500ms;
static auto constexpr MaxAnnouncesPerUpkeep = int{ 20 };
static auto constexpr MaxScrapesPerUpkeep = int{ 20 };
/* this is how often to call the UDP tracker upkeep */
static auto constexpr TauUpkeepIntervalSecs = int{ 5 };
/* how many infohashes to remove when we get a scrape-too-long error */
static auto constexpr TrMultiscrapeStep = int{ 5 };
@ -73,11 +70,11 @@ namespace
struct StopsCompare
{
[[nodiscard]] static int compare(tr_announce_request const* a, tr_announce_request const* b) noexcept // <=>
[[nodiscard]] static constexpr auto compare(tr_announce_request const& one, tr_announce_request const& two) noexcept // <=>
{
// primary key: volume of data transferred
auto const ax = a->up + a->down;
auto const bx = b->up + b->down;
auto const ax = one.up + one.down;
auto const bx = two.up + two.down;
if (ax < bx)
{
return -1;
@ -88,21 +85,20 @@ struct StopsCompare
}
// secondary key: the torrent's info_hash
if (a->info_hash < b->info_hash)
for (size_t i = 0, n = sizeof(tr_sha1_digest_t); i < n; ++i)
{
return -1;
}
if (a->info_hash > b->info_hash)
{
return 1;
if (one.info_hash[i] != two.info_hash[i])
{
return one.info_hash[i] < two.info_hash[i] ? -1 : 1;
}
}
// tertiary key: the tracker's announce url
if (a->announce_url < b->announce_url)
if (one.announce_url < two.announce_url)
{
return -1;
}
if (a->announce_url > b->announce_url)
if (one.announce_url > two.announce_url)
{
return 1;
}
@ -110,7 +106,7 @@ struct StopsCompare
return 0;
}
[[nodiscard]] bool operator()(tr_announce_request const* one, tr_announce_request const* two) const noexcept
[[nodiscard]] constexpr auto operator()(tr_announce_request const& one, tr_announce_request const& two) const noexcept
{
return compare(one, two) < 0;
}
@ -128,7 +124,7 @@ struct tr_scrape_info
tr_interned_string scrape_url;
tr_scrape_info(tr_interned_string scrape_url_in, int const multiscrape_max_in)
constexpr tr_scrape_info(tr_interned_string scrape_url_in, int const multiscrape_max_in)
: multiscrape_max{ multiscrape_max_in }
, scrape_url{ scrape_url_in }
{
@ -138,66 +134,113 @@ struct tr_scrape_info
/**
* "global" (per-tr_session) fields
*/
struct tr_announcer
class tr_announcer_impl final : public tr_announcer
{
explicit tr_announcer(tr_session* session_in)
public:
explicit tr_announcer_impl(tr_session* session_in, tr_announcer_udp& announcer_udp)
: session{ session_in }
, upkeep_timer{ session_in->timerMaker().create() }
, announcer_udp_{ announcer_udp }
, upkeep_timer_{ session_in->timerMaker().create() }
{
upkeep_timer->setCallback([this]() { this->upkeep(); });
upkeep_timer->startRepeating(UpkeepInterval);
upkeep_timer_->setCallback([this]() { this->upkeep(); });
upkeep_timer_->startRepeating(UpkeepInterval);
}
~tr_announcer() = default;
tr_announcer(tr_announcer&&) = delete;
tr_announcer(tr_announcer const&) = delete;
tr_announcer& operator=(tr_announcer&&) = delete;
tr_announcer& operator=(tr_announcer const&) = delete;
~tr_announcer_impl() override
{
flushCloseMessages();
}
tr_announcer_impl(tr_announcer_impl&&) = delete;
tr_announcer_impl(tr_announcer_impl const&) = delete;
tr_announcer_impl& operator=(tr_announcer_impl&&) = delete;
tr_announcer_impl& operator=(tr_announcer_impl const&) = delete;
tr_torrent_announcer* addTorrent(tr_torrent* tor, tr_tracker_callback callback, void* callback_data) override;
void startTorrent(tr_torrent* tor) override;
void stopTorrent(tr_torrent* tor) override;
void resetTorrent(tr_torrent* tor) override;
void removeTorrent(tr_torrent* tor) override;
void upkeep();
std::set<tr_announce_request*, StopsCompare> stops;
std::map<tr_interned_string, tr_scrape_info> scrape_info;
void onAnnounceDone(int tier_id, tr_announce_event event, bool is_running_on_success, tr_announce_response const& response);
void onScrapeDone(tr_scrape_response const& response);
tr_session* const session;
std::unique_ptr<libtransmission::Timer> const upkeep_timer;
time_t tau_upkeep_at = 0;
int const key = tr_rand_int(INT_MAX);
};
static tr_scrape_info* tr_announcerGetScrapeInfo(tr_announcer* announcer, tr_interned_string url)
{
if (std::empty(url))
[[nodiscard]] tr_scrape_info* scrape_info(tr_interned_string url)
{
return nullptr;
if (std::empty(url))
{
return nullptr;
}
auto const [it, is_new] = scrape_info_.try_emplace(url, url, TR_MULTISCRAPE_MAX);
return &it->second;
}
auto& scrapes = announcer->scrape_info;
auto const [it, is_new] = scrapes.try_emplace(url, url, TR_MULTISCRAPE_MAX);
return &it->second;
}
void scrape(tr_scrape_request const& request, tr_scrape_response_func on_response)
{
if (auto const scrape_sv = request.scrape_url.sv();
tr_strvStartsWith(scrape_sv, "http://"sv) || tr_strvStartsWith(scrape_sv, "https://"sv))
{
tr_tracker_http_scrape(session, request, std::move(on_response));
}
else if (tr_strvStartsWith(scrape_sv, "udp://"sv))
{
announcer_udp_.scrape(request, std::move(on_response));
}
else
{
tr_logAddError(fmt::format(_("Unsupported URL: '{url}'"), fmt::arg("url", scrape_sv)));
}
}
void tr_announcerInit(tr_session* session)
void announce(tr_announce_request const& request, tr_announce_response_func on_response)
{
if (auto const announce_sv = request.announce_url.sv();
tr_strvStartsWith(announce_sv, "http://"sv) || tr_strvStartsWith(announce_sv, "https://"sv))
{
tr_tracker_http_announce(session, request, std::move(on_response));
}
else if (tr_strvStartsWith(announce_sv, "udp://"sv))
{
announcer_udp_.announce(request, std::move(on_response));
}
else
{
tr_logAddWarn(fmt::format(_("Unsupported URL: '{url}'"), fmt::arg("url", announce_sv)));
}
}
tr_session* const session;
int const key = tr_rand_int(INT_MAX);
private:
void flushCloseMessages()
{
for (auto& stop : stops_)
{
announce(stop, {});
}
stops_.clear();
}
tr_announcer_udp& announcer_udp_;
std::map<tr_interned_string, tr_scrape_info> scrape_info_;
std::unique_ptr<libtransmission::Timer> const upkeep_timer_;
std::set<tr_announce_request, StopsCompare> stops_;
};
std::unique_ptr<tr_announcer> tr_announcer::create(tr_session* session, tr_announcer_udp& announcer_udp)
{
TR_ASSERT(session != nullptr);
auto* a = new tr_announcer{ session };
session->announcer = a;
}
static void flushCloseMessages(tr_announcer* announcer);
void tr_announcerClose(tr_session* session)
{
tr_announcer* announcer = session->announcer;
flushCloseMessages(announcer);
session->announcer = nullptr;
delete announcer;
return std::make_unique<tr_announcer_impl>(session, announcer_udp);
}
/***
@ -207,11 +250,11 @@ void tr_announcerClose(tr_session* session)
/* a row in tr_tier's list of trackers */
struct tr_tracker
{
explicit tr_tracker(tr_announcer* announcer, tr_announce_list::tracker_info const& info)
explicit tr_tracker(tr_announcer_impl* announcer, tr_announce_list::tracker_info const& info)
: host{ info.host }
, announce_url{ info.announce }
, sitename{ info.sitename }
, scrape_info{ std::empty(info.scrape) ? nullptr : tr_announcerGetScrapeInfo(announcer, info.scrape) }
, scrape_info{ std::empty(info.scrape) ? nullptr : announcer->scrape_info(info.scrape) }
, id{ info.id }
{
}
@ -278,7 +321,7 @@ tr_interned_string tr_announcerGetKey(tr_url_parsed_t const& parsed)
/** @brief A group of trackers in a single tier, as per the multitracker spec */
struct tr_tier
{
tr_tier(tr_announcer* announcer, tr_torrent* tor_in, std::vector<tr_announce_list::tracker_info const*> const& infos)
tr_tier(tr_announcer_impl* announcer, tr_torrent* tor_in, std::vector<tr_announce_list::tracker_info const*> const& infos)
: tor{ tor_in }
{
trackers.reserve(std::size(infos));
@ -486,7 +529,7 @@ private:
*/
struct tr_torrent_announcer
{
tr_torrent_announcer(tr_announcer* announcer, tr_torrent* tor)
tr_torrent_announcer(tr_announcer_impl* announcer, tr_torrent* tor)
{
// build the trackers
auto tier_to_infos = std::map<tr_tracker_tier_t, std::vector<tr_announce_list::tracker_info const*>>{};
@ -576,7 +619,7 @@ private:
}
};
static tr_tier* getTier(tr_announcer* announcer, tr_sha1_digest_t const& info_hash, int tier_id)
static tr_tier* getTier(tr_announcer_impl* announcer, tr_sha1_digest_t const& info_hash, int tier_id)
{
if (announcer == nullptr)
{
@ -596,14 +639,14 @@ static tr_tier* getTier(tr_announcer* announcer, tr_sha1_digest_t const& info_ha
**** PUBLISH
***/
static void publishMessage(tr_tier* tier, std::string_view msg, TrackerEventType type)
static void publishMessage(tr_tier* tier, std::string_view msg, tr_tracker_event::Type type)
{
if (tier != nullptr && tier->tor != nullptr && tier->tor->torrent_announcer != nullptr &&
tier->tor->torrent_announcer->callback != nullptr)
{
auto* const ta = tier->tor->torrent_announcer;
auto event = tr_tracker_event{};
event.messageType = type;
event.type = type;
event.text = msg;
if (auto const* const current_tracker = tier->currentTracker(); current_tracker != nullptr)
@ -617,17 +660,17 @@ static void publishMessage(tr_tier* tier, std::string_view msg, TrackerEventType
static void publishErrorClear(tr_tier* tier)
{
publishMessage(tier, ""sv, TR_TRACKER_ERROR_CLEAR);
publishMessage(tier, ""sv, tr_tracker_event::Type::ErrorClear);
}
static void publishWarning(tr_tier* tier, std::string_view msg)
{
publishMessage(tier, msg, TR_TRACKER_WARNING);
publishMessage(tier, msg, tr_tracker_event::Type::Warning);
}
static void publishError(tr_tier* tier, std::string_view msg)
{
publishMessage(tier, msg, TR_TRACKER_ERROR);
publishMessage(tier, msg, tr_tracker_event::Type::Error);
}
static void publishPeerCounts(tr_tier* tier, int seeders, int leechers)
@ -635,7 +678,7 @@ static void publishPeerCounts(tr_tier* tier, int seeders, int leechers)
if (tier->tor->torrent_announcer->callback != nullptr)
{
auto e = tr_tracker_event{};
e.messageType = TR_TRACKER_COUNTS;
e.type = tr_tracker_event::Type::Counts;
e.seeders = seeders;
e.leechers = leechers;
tr_logAddDebugTier(tier, fmt::format("peer counts: {} seeders, {} leechers.", seeders, leechers));
@ -649,7 +692,7 @@ static void publishPeersPex(tr_tier* tier, int seeders, int leechers, std::vecto
if (tier->tor->torrent_announcer->callback != nullptr)
{
auto e = tr_tracker_event{};
e.messageType = TR_TRACKER_PEERS;
e.type = tr_tracker_event::Type::Peers;
e.seeders = seeders;
e.leechers = leechers;
e.pex = pex;
@ -669,11 +712,11 @@ static void publishPeersPex(tr_tier* tier, int seeders, int leechers, std::vecto
****
***/
tr_torrent_announcer* tr_announcerAddTorrent(tr_torrent* tor, tr_tracker_callback callback, void* callback_data)
tr_torrent_announcer* tr_announcer_impl::addTorrent(tr_torrent* tor, tr_tracker_callback callback, void* callback_data)
{
TR_ASSERT(tr_isTorrent(tor));
auto* ta = new tr_torrent_announcer(tor->session->announcer, tor);
auto* ta = new tr_torrent_announcer{ this, tor };
ta->callback = callback;
ta->callback_data = callback_data;
return ta;
@ -802,7 +845,7 @@ static void torrentAddAnnounce(tr_torrent* tor, tr_announce_event e, time_t anno
}
}
void tr_announcerTorrentStarted(tr_torrent* tor)
void tr_announcer_impl::startTorrent(tr_torrent* tor)
{
torrentAddAnnounce(tor, TR_ANNOUNCE_EVENT_STARTED, tr_time());
}
@ -812,7 +855,7 @@ void tr_announcerManualAnnounce(tr_torrent* tor)
torrentAddAnnounce(tor, TR_ANNOUNCE_EVENT_NONE, tr_time());
}
void tr_announcerTorrentStopped(tr_torrent* tor)
void tr_announcer_impl::stopTorrent(tr_torrent* tor)
{
torrentAddAnnounce(tor, TR_ANNOUNCE_EVENT_STOPPED, tr_time());
}
@ -824,7 +867,7 @@ void tr_announcerTorrentCompleted(tr_torrent* tor)
void tr_announcerChangeMyPort(tr_torrent* tor)
{
tr_announcerTorrentStarted(tor);
torrentAddAnnounce(tor, TR_ANNOUNCE_EVENT_STARTED, tr_time());
}
/***
@ -846,35 +889,36 @@ void tr_announcerAddBytes(tr_torrent* tor, int type, uint32_t n_bytes)
****
***/
static tr_announce_request* announce_request_new(
tr_announcer const* announcer,
tr_torrent* tor,
tr_tier const* tier,
tr_announce_event event)
[[nodiscard]] static tr_announce_request create_announce_request(
tr_announcer_impl const* const announcer,
tr_torrent* const tor,
tr_tier const* const tier,
tr_announce_event const event)
{
auto const* const current_tracker = tier->currentTracker();
TR_ASSERT(current_tracker != nullptr);
auto* const req = new tr_announce_request();
req->port = announcer->session->advertisedPeerPort();
req->announce_url = current_tracker->announce_url;
req->tracker_id = current_tracker->tracker_id;
req->info_hash = tor->infoHash();
req->peer_id = tr_torrentGetPeerId(tor);
req->up = tier->byteCounts[TR_ANN_UP];
req->down = tier->byteCounts[TR_ANN_DOWN];
req->corrupt = tier->byteCounts[TR_ANN_CORRUPT];
req->leftUntilComplete = tor->hasMetainfo() ? tor->totalSize() - tor->hasTotal() : INT64_MAX;
req->event = event;
req->numwant = event == TR_ANNOUNCE_EVENT_STOPPED ? 0 : Numwant;
req->key = announcer->key;
req->partial_seed = tor->isPartialSeed();
tier->buildLogName(req->log_name, sizeof(req->log_name));
auto req = tr_announce_request{};
req.port = announcer->session->advertisedPeerPort();
req.announce_url = current_tracker->announce_url;
req.tracker_id = current_tracker->tracker_id;
req.info_hash = tor->infoHash();
req.peer_id = tr_torrentGetPeerId(tor);
req.up = tier->byteCounts[TR_ANN_UP];
req.down = tier->byteCounts[TR_ANN_DOWN];
req.corrupt = tier->byteCounts[TR_ANN_CORRUPT];
req.leftUntilComplete = tor->hasMetainfo() ? tor->totalSize() - tor->hasTotal() : INT64_MAX;
req.event = event;
req.numwant = event == TR_ANNOUNCE_EVENT_STOPPED ? 0 : Numwant;
req.key = announcer->key;
req.partial_seed = tor->isPartialSeed();
tier->buildLogName(req.log_name, sizeof(req.log_name));
return req;
}
void tr_announcerRemoveTorrent(tr_announcer* announcer, tr_torrent* tor)
void tr_announcer_impl::removeTorrent(tr_torrent* tor)
{
// FIXME(ckerr)
auto* const ta = tor->torrent_announcer;
if (ta == nullptr)
{
@ -885,17 +929,7 @@ void tr_announcerRemoveTorrent(tr_announcer* announcer, tr_torrent* tor)
{
if (tier.isRunning)
{
auto const e = TR_ANNOUNCE_EVENT_STOPPED;
auto* req = announce_request_new(announcer, tor, &tier, e);
if (announcer->stops.count(req) != 0U)
{
delete req;
}
else
{
announcer->stops.insert(req);
}
stops_.emplace(create_announce_request(this, tor, &tier, TR_ANNOUNCE_EVENT_STOPPED));
}
}
@ -903,17 +937,6 @@ void tr_announcerRemoveTorrent(tr_announcer* announcer, tr_torrent* tor)
delete ta;
}
struct announce_data
{
int tier_id = 0;
time_t time_sent = {};
tr_announce_event event = {};
tr_session* session = nullptr;
/** If the request succeeds, the value for tier's "isRunning" flag */
bool is_running_on_success = false;
};
static bool isUnregistered(char const* errmsg)
{
auto const lower = tr_strlower(errmsg != nullptr ? errmsg : "");
@ -959,20 +982,19 @@ static void on_announce_error(tr_tier* tier, char const* err, tr_announce_event
}
}
static void onAnnounceDone(tr_announce_response const* response, void* vdata)
void tr_announcer_impl::onAnnounceDone(
int tier_id,
tr_announce_event event,
bool is_running_on_success,
tr_announce_response const& response)
{
auto* const data = static_cast<announce_data*>(vdata);
tr_announcer* announcer = data->session->announcer;
tr_tier* const tier = getTier(announcer, response->info_hash, data->tier_id);
auto* const tier = getTier(this, response.info_hash, tier_id);
if (tier == nullptr)
{
delete data;
return;
}
auto const now = tr_time();
tr_announce_event const event = data->event;
tr_logAddTraceTier(
tier,
@ -990,39 +1012,39 @@ static void onAnnounceDone(tr_announce_response const* response, void* vdata)
"pex6:{} "
"err:{} "
"warn:{}",
response->did_connect,
response->did_timeout,
response->seeders,
response->leechers,
response->downloads,
response->interval,
response->min_interval,
(!std::empty(response->tracker_id) ? response->tracker_id.c_str() : "none"),
std::size(response->pex),
std::size(response->pex6),
(!std::empty(response->errmsg) ? response->errmsg.c_str() : "none"),
(!std::empty(response->warning) ? response->warning.c_str() : "none")));
response.did_connect,
response.did_timeout,
response.seeders,
response.leechers,
response.downloads,
response.interval,
response.min_interval,
(!std::empty(response.tracker_id) ? response.tracker_id.c_str() : "none"),
std::size(response.pex),
std::size(response.pex6),
(!std::empty(response.errmsg) ? response.errmsg.c_str() : "none"),
(!std::empty(response.warning) ? response.warning.c_str() : "none")));
tier->lastAnnounceTime = now;
tier->lastAnnounceTimedOut = response->did_timeout;
tier->lastAnnounceTimedOut = response.did_timeout;
tier->lastAnnounceSucceeded = false;
tier->isAnnouncing = false;
tier->manualAnnounceAllowedAt = now + tier->announceMinIntervalSec;
if (response->external_ip)
if (response.external_ip)
{
data->session->setExternalIP(*response->external_ip);
session->setExternalIP(*response.external_ip);
}
if (!response->did_connect)
if (!response.did_connect)
{
on_announce_error(tier, _("Could not connect to tracker"), event);
}
else if (response->did_timeout)
else if (response.did_timeout)
{
on_announce_error(tier, _("Tracker did not respond"), event);
}
else if (!std::empty(response->errmsg))
else if (!std::empty(response.errmsg))
{
/* If the torrent's only tracker returned an error, publish it.
Don't bother publishing if there are other trackers -- it's
@ -1030,10 +1052,10 @@ static void onAnnounceDone(tr_announce_response const* response, void* vdata)
in a torrent's metainfo... */
if (tier->tor->trackerCount() < 2)
{
publishError(tier, response->errmsg);
publishError(tier, response.errmsg);
}
on_announce_error(tier, response->errmsg.c_str(), event);
on_announce_error(tier, response.errmsg.c_str(), event);
}
else
{
@ -1049,31 +1071,31 @@ static void onAnnounceDone(tr_announce_response const* response, void* vdata)
{
tracker->consecutive_failures = 0;
if (response->seeders >= 0)
if (response.seeders >= 0)
{
tracker->seeder_count = seeders = response->seeders;
tracker->seeder_count = seeders = response.seeders;
++scrape_fields;
}
if (response->leechers >= 0)
if (response.leechers >= 0)
{
tracker->leecher_count = leechers = response->leechers;
tracker->leecher_count = leechers = response.leechers;
++scrape_fields;
}
if (response->downloads >= 0)
if (response.downloads >= 0)
{
tracker->download_count = response->downloads;
tracker->download_count = response.downloads;
++scrape_fields;
}
if (!std::empty(response->tracker_id))
if (!std::empty(response.tracker_id))
{
tracker->tracker_id = response->tracker_id;
tracker->tracker_id = response.tracker_id;
}
}
if (auto const& warning = response->warning; !std::empty(warning))
if (auto const& warning = response.warning; !std::empty(warning))
{
tier->last_announce_str = warning;
tr_logAddTraceTier(tier, fmt::format("tracker gave '{}'", warning));
@ -1084,29 +1106,29 @@ static void onAnnounceDone(tr_announce_response const* response, void* vdata)
tier->last_announce_str = _("Success");
}
if (response->min_interval != 0)
if (response.min_interval != 0)
{
tier->announceMinIntervalSec = response->min_interval;
tier->announceMinIntervalSec = response.min_interval;
}
if (response->interval != 0)
if (response.interval != 0)
{
tier->announceIntervalSec = response->interval;
tier->announceIntervalSec = response.interval;
}
if (!std::empty(response->pex))
if (!std::empty(response.pex))
{
publishPeersPex(tier, seeders, leechers, response->pex);
publishPeersPex(tier, seeders, leechers, response.pex);
}
if (!std::empty(response->pex6))
if (!std::empty(response.pex6))
{
publishPeersPex(tier, seeders, leechers, response->pex6);
publishPeersPex(tier, seeders, leechers, response.pex6);
}
publishPeerCounts(tier, seeders, leechers);
tier->isRunning = data->is_running_on_success;
tier->isRunning = is_running_on_success;
/* if the tracker included scrape fields in its announce response,
then a separate scrape isn't needed */
@ -1127,7 +1149,7 @@ static void onAnnounceDone(tr_announce_response const* response, void* vdata)
}
tier->lastAnnounceSucceeded = true;
tier->lastAnnouncePeerCount = std::size(response->pex) + std::size(response->pex6);
tier->lastAnnouncePeerCount = std::size(response.pex) + std::size(response.pex6);
if (is_stopped)
{
@ -1147,64 +1169,30 @@ static void onAnnounceDone(tr_announce_response const* response, void* vdata)
tier_announce_event_push(tier, TR_ANNOUNCE_EVENT_NONE, now + i);
}
}
delete data;
}
static void announce_request_delegate(
tr_announcer* announcer,
tr_announce_request* request,
tr_announce_response_func callback,
announce_data* callback_data)
{
tr_session* session = announcer->session;
#if 0
fprintf(stderr, "ANNOUNCE: event %s isPartialSeed %d port %d key %d numwant %d up %" PRIu64 " down %" PRIu64
" corrupt %" PRIu64 " left %" PRIu64 " url [%s] tracker_id_str [%s] peer_id [%20.20s]\n",
tr_announce_event_get_string(request->event), (int)request->partial_seed, (int)request->port, request->key,
request->numwant, request->up, request->down, request->corrupt, request->leftUntilComplete, request->url,
request->tracker_id_str, request->peer_id);
#endif
if (auto const announce_sv = request->announce_url.sv();
tr_strvStartsWith(announce_sv, "http://"sv) || tr_strvStartsWith(announce_sv, "https://"sv))
{
tr_tracker_http_announce(session, request, callback, callback_data);
}
else if (tr_strvStartsWith(announce_sv, "udp://"sv))
{
session->announcer_udp_->announce(*request, callback, callback_data);
}
else
{
tr_logAddWarn(fmt::format(_("Unsupported URL: '{url}'"), fmt::arg("url", announce_sv)));
delete callback_data;
}
delete request;
}
static void tierAnnounce(tr_announcer* announcer, tr_tier* tier)
static void tierAnnounce(tr_announcer_impl* announcer, tr_tier* tier)
{
TR_ASSERT(!tier->isAnnouncing);
TR_ASSERT(!std::empty(tier->announce_events));
TR_ASSERT(tier->currentTracker() != nullptr);
time_t const now = tr_time();
auto const now = tr_time();
tr_torrent* tor = tier->tor;
tr_announce_event const announce_event = tier_announce_event_pull(tier);
tr_announce_request* req = announce_request_new(announcer, tor, tier, announce_event);
auto* const data = new announce_data{ tier->id, now, announce_event, announcer->session, tor->isRunning };
auto const event = tier_announce_event_pull(tier);
auto const req = create_announce_request(announcer, tor, tier, event);
tier->isAnnouncing = true;
tier->lastAnnounceStartTime = now;
announce_request_delegate(announcer, req, onAnnounceDone, data);
auto tier_id = tier->id;
auto is_running_on_success = tor->isRunning;
announcer->announce(
req,
[announcer, tier_id, event, is_running_on_success](tr_announce_response const& response)
{ announcer->onAnnounceDone(tier_id, event, is_running_on_success, response); });
}
/***
@ -1254,15 +1242,15 @@ static void on_scrape_error(tr_session const* /*session*/, tr_tier* tier, char c
tier->scheduleNextScrape(interval);
}
static void checkMultiscrapeMax(tr_announcer* announcer, tr_scrape_response const* response)
static void checkMultiscrapeMax(tr_announcer_impl* announcer, tr_scrape_response const& response)
{
if (!multiscrape_too_big(response->errmsg))
if (!multiscrape_too_big(response.errmsg))
{
return;
}
auto const& url = response->scrape_url;
auto* const scrape_info = tr_announcerGetScrapeInfo(announcer, url);
auto const& url = response.scrape_url;
auto* const scrape_info = announcer->scrape_info(url);
if (scrape_info == nullptr)
{
return;
@ -1272,7 +1260,7 @@ static void checkMultiscrapeMax(tr_announcer* announcer, tr_scrape_response cons
// error. So if N parallel multiscrapes all have the same `max`
// and error out, lower the value once for that batch, not N times.
int& multiscrape_max = scrape_info->multiscrape_max;
if (multiscrape_max < response->row_count)
if (multiscrape_max < response.row_count)
{
return;
}
@ -1293,15 +1281,13 @@ static void checkMultiscrapeMax(tr_announcer* announcer, tr_scrape_response cons
}
}
static void on_scrape_done(tr_scrape_response const* response, void* vsession)
void tr_announcer_impl::onScrapeDone(tr_scrape_response const& response)
{
auto const now = tr_time();
auto* const session = static_cast<tr_session*>(vsession);
auto* const announcer = session->announcer;
for (int i = 0; i < response->row_count; ++i)
for (int i = 0; i < response.row_count; ++i)
{
auto const& row = response->rows[i];
auto const& row = response.rows[i];
auto* const tor = session->torrents().get(row.info_hash);
if (tor == nullptr)
@ -1309,7 +1295,7 @@ static void on_scrape_done(tr_scrape_response const* response, void* vsession)
continue;
}
auto* const tier = tor->torrent_announcer->getTierFromScrape(response->scrape_url);
auto* const tier = tor->torrent_announcer->getTierFromScrape(response.scrape_url);
if (tier == nullptr)
{
continue;
@ -1328,37 +1314,37 @@ static void on_scrape_done(tr_scrape_response const* response, void* vsession)
"downloaders:{} "
"min_request_interval:{} "
"err:{} ",
response->scrape_url.sv(),
response->did_connect,
response->did_timeout,
response.scrape_url.sv(),
response.did_connect,
response.did_timeout,
row.seeders,
row.leechers,
row.downloads,
row.downloaders,
response->min_request_interval,
std::empty(response->errmsg) ? "none"sv : response->errmsg));
response.min_request_interval,
std::empty(response.errmsg) ? "none"sv : response.errmsg));
tier->isScraping = false;
tier->lastScrapeTime = now;
tier->lastScrapeSucceeded = false;
tier->lastScrapeTimedOut = response->did_timeout;
tier->lastScrapeTimedOut = response.did_timeout;
if (!response->did_connect)
if (!response.did_connect)
{
on_scrape_error(session, tier, _("Could not connect to tracker"));
}
else if (response->did_timeout)
else if (response.did_timeout)
{
on_scrape_error(session, tier, _("Tracker did not respond"));
}
else if (!std::empty(response->errmsg))
else if (!std::empty(response.errmsg))
{
on_scrape_error(session, tier, response->errmsg.c_str());
on_scrape_error(session, tier, response.errmsg.c_str());
}
else
{
tier->lastScrapeSucceeded = true;
tier->scrapeIntervalSec = std::max(int{ DefaultScrapeIntervalSec }, response->min_request_interval);
tier->scrapeIntervalSec = std::max(int{ DefaultScrapeIntervalSec }, response.min_request_interval);
tier->scheduleNextScrape();
tr_logAddTraceTier(tier, fmt::format("Scrape successful. Rescraping in {} seconds.", tier->scrapeIntervalSec));
@ -1390,34 +1376,10 @@ static void on_scrape_done(tr_scrape_response const* response, void* vsession)
}
}
checkMultiscrapeMax(announcer, response);
checkMultiscrapeMax(this, response);
}
static void scrape_request_delegate(
tr_announcer* announcer,
tr_scrape_request const* request,
tr_scrape_response_func callback,
void* callback_data)
{
tr_session* session = announcer->session;
auto const scrape_sv = request->scrape_url.sv();
if (tr_strvStartsWith(scrape_sv, "http://"sv) || tr_strvStartsWith(scrape_sv, "https://"sv))
{
tr_tracker_http_scrape(session, request, callback, callback_data);
}
else if (tr_strvStartsWith(scrape_sv, "udp://"sv))
{
session->announcer_udp_->scrape(*request, callback, callback_data);
}
else
{
tr_logAddError(fmt::format(_("Unsupported URL: '{url}'"), fmt::arg("url", scrape_sv)));
}
}
static void multiscrape(tr_announcer* announcer, std::vector<tr_tier*> const& tiers)
static void multiscrape(tr_announcer_impl* announcer, std::vector<tr_tier*> const& tiers)
{
auto const now = tr_time();
auto requests = std::array<tr_scrape_request, MaxScrapesPerUpkeep>{};
@ -1472,20 +1434,10 @@ static void multiscrape(tr_announcer* announcer, std::vector<tr_tier*> const& ti
/* send the requests we just built */
for (size_t i = 0; i < request_count; ++i)
{
scrape_request_delegate(announcer, &requests[i], on_scrape_done, announcer->session);
announcer->scrape(requests[i], [announcer](tr_scrape_response const& response) { announcer->onScrapeDone(response); });
}
}
static void flushCloseMessages(tr_announcer* announcer)
{
auto& stops = announcer->stops;
std::for_each(
std::begin(stops),
std::end(stops),
[&announcer](auto* stop) { announce_request_delegate(announcer, stop, nullptr, nullptr); });
stops.clear();
}
static int compareAnnounceTiers(tr_tier const* a, tr_tier const* b)
{
/* prefer higher-priority events */
@ -1525,7 +1477,7 @@ static int compareAnnounceTiers(tr_tier const* a, tr_tier const* b)
return a < b ? -1 : 1;
}
static void scrapeAndAnnounceMore(tr_announcer* announcer)
static void scrapeAndAnnounceMore(tr_announcer_impl* announcer)
{
time_t const now = tr_time();
@ -1573,28 +1525,20 @@ static void scrapeAndAnnounceMore(tr_announcer* announcer)
}
}
void tr_announcer::upkeep()
void tr_announcer_impl::upkeep()
{
auto const lock = session->unique_lock();
auto const is_closing = session->isClosing();
auto const now = tr_time();
// maybe send out some "stopped" messages for closed torrents
flushCloseMessages();
/* maybe send out some "stopped" messages for closed torrents */
flushCloseMessages(this);
/* maybe kick off some scrapes / announces whose time has come */
if (!is_closing)
// maybe kick off some scrapes / announces whose time has come
if (!session->isClosing())
{
scrapeAndAnnounceMore(this);
}
/* TAU upkeep */
if (this->tau_upkeep_at <= now)
{
this->tau_upkeep_at = now + TauUpkeepIntervalSecs;
session->announcer_udp_->upkeep();
}
announcer_udp_.upkeep();
}
/***
@ -1735,11 +1679,11 @@ tr_tracker_view tr_announcerTracker(tr_torrent const* tor, size_t nth)
// called after the torrent's announceList was rebuilt --
// so announcer needs to update the tr_tier / tr_trackers to match
void tr_announcerResetTorrent(tr_announcer* /*announcer*/, tr_torrent* tor)
void tr_announcer_impl::resetTorrent(tr_torrent* tor)
{
// make a new tr_announcer_tier
auto* const older = tor->torrent_announcer;
tor->torrent_announcer = new tr_torrent_announcer(tor->session->announcer, tor);
tor->torrent_announcer = new tr_torrent_announcer{ this, tor };
auto* const newer = tor->torrent_announcer;
// copy the tracker counts into the new replacementa

View File

@ -12,6 +12,7 @@
#include <cstddef> // size_t
#include <cstdint> // uint32_t
#include <ctime>
#include <functional>
#include <string_view>
#include <vector>
@ -20,70 +21,70 @@
#include "interned-string.h"
#include "net.h"
struct tr_announcer;
class tr_announcer;
class tr_announcer_udp;
struct tr_torrent_announcer;
/**
* *** Tracker Publish / Subscribe
* **/
enum TrackerEventType
{
TR_TRACKER_WARNING,
TR_TRACKER_ERROR,
TR_TRACKER_ERROR_CLEAR,
TR_TRACKER_PEERS,
TR_TRACKER_COUNTS,
};
*** Tracker Publish / Subscribe
**/
struct tr_pex;
/** @brief Notification object to tell listeners about announce or scrape occurrences */
struct tr_tracker_event
{
/* what type of event this is */
TrackerEventType messageType;
enum class Type
{
Error,
ErrorClear,
Counts,
Peers,
Warning,
};
/* for TR_TRACKER_WARNING and TR_TRACKER_ERROR */
// What type of event this is
Type type;
// for Warning and Error events
std::string_view text;
tr_interned_string announce_url;
/* for TR_TRACKER_PEERS */
// for Peers events
std::vector<tr_pex> pex;
/* for TR_TRACKER_PEERS and TR_TRACKER_COUNTS */
// for Peers and Counts events
int leechers;
int seeders;
};
using tr_tracker_callback = void (*)(tr_torrent* tor, tr_tracker_event const* event, void* client_data);
/**
*** Session ctor/dtor
**/
class tr_announcer
{
public:
[[nodiscard]] static std::unique_ptr<tr_announcer> create(tr_session* session, tr_announcer_udp&);
virtual ~tr_announcer() = default;
void tr_announcerInit(tr_session*);
virtual tr_torrent_announcer* addTorrent(tr_torrent*, tr_tracker_callback callback, void* callback_data) = 0;
virtual void startTorrent(tr_torrent* tor) = 0;
virtual void stopTorrent(tr_torrent* tor) = 0;
virtual void resetTorrent(tr_torrent* tor) = 0;
virtual void removeTorrent(tr_torrent* tor) = 0;
};
void tr_announcerClose(tr_session*);
std::unique_ptr<tr_announcer> tr_announcerCreate(tr_session* session);
/**
*** For torrent customers
**/
struct tr_torrent_announcer* tr_announcerAddTorrent(tr_torrent* torrent, tr_tracker_callback callback, void* callback_data);
void tr_announcerResetTorrent(struct tr_announcer*, tr_torrent*);
void tr_announcerRemoveTorrent(struct tr_announcer*, tr_torrent*);
void tr_announcerChangeMyPort(tr_torrent*);
bool tr_announcerCanManualAnnounce(tr_torrent const*);
void tr_announcerManualAnnounce(tr_torrent*);
void tr_announcerTorrentStarted(tr_torrent*);
void tr_announcerTorrentStopped(tr_torrent*);
void tr_announcerTorrentCompleted(tr_torrent*);
enum
@ -116,14 +117,15 @@ enum tr_announce_event
struct tr_announce_request;
struct tr_announce_response;
using tr_announce_response_func = void (*)(tr_announce_response const* response, void* userdata);
struct tr_scrape_request;
struct tr_scrape_response;
using tr_scrape_response_func = void (*)(tr_scrape_response const* response, void* user_data);
/// UDP ANNOUNCER
using tr_scrape_response_func = std::function<void(tr_scrape_response const&)>;
using tr_announce_response_func = std::function<void(tr_announce_response const&)>;
class tr_announcer_udp
{
public:
@ -141,9 +143,9 @@ public:
[[nodiscard]] virtual bool isIdle() const noexcept = 0;
virtual void announce(tr_announce_request const& request, tr_announce_response_func response_func, void* user_data) = 0;
virtual void announce(tr_announce_request const& request, tr_announce_response_func on_response) = 0;
virtual void scrape(tr_scrape_request const& request, tr_scrape_response_func response_func, void* user_data) = 0;
virtual void scrape(tr_scrape_request const& request, tr_scrape_response_func on_response) = 0;
virtual void upkeep() = 0;

View File

@ -558,8 +558,6 @@ void tr_session::initImpl(init_data& data)
this->blocklists_ = libtransmission::Blocklist::loadBlocklists(blocklist_dir_, useBlocklist());
tr_announcerInit(this);
tr_logAddInfo(fmt::format(_("Transmission version {version} starting"), fmt::arg("version", LONG_VERSION_STRING)));
setSettings(client_settings, true);
@ -1248,7 +1246,7 @@ void tr_session::closeImplPart1(std::promise<void>* closed_promise)
// remaining `event=stopped` announce messages are queued in
// the announcer. The announcer's destructor sends all those
// out via `web_`...
tr_announcerClose(this);
this->announcer_.reset();
// ...and now that those are queued, tell web_ that we're
// shutting down soon. This leaves the `event=stopped` messages
// in the queue but refuses to take any _new_ tasks
@ -1503,7 +1501,7 @@ void tr_session::setDefaultTrackers(std::string_view trackers)
{
if (tor->isPublic())
{
tr_announcerResetTorrent(announcer, tor);
announcer_->resetTorrent(tor);
}
}
}

View File

@ -58,7 +58,6 @@ class tr_rpc_server;
class tr_session_thread;
class tr_web;
struct struct_utp_context;
struct tr_announcer;
struct tr_variant;
namespace libtransmission
@ -1151,7 +1150,7 @@ public:
std::unique_ptr<tr_announcer_udp> announcer_udp_ = tr_announcer_udp::create(announcer_udp_mediator_);
// depends-on: settings_, torrents_, web_, announcer_udp_
struct tr_announcer* announcer = nullptr;
std::unique_ptr<tr_announcer> announcer_ = tr_announcer::create(this, *announcer_udp_);
// depends-on: public_peer_port_, udp_core_, dht_mediator_
std::unique_ptr<tr_dht> dht_;

View File

@ -462,14 +462,14 @@ static void tr_torrentClearError(tr_torrent* tor)
static void onTrackerResponse(tr_torrent* tor, tr_tracker_event const* event, void* /*user_data*/)
{
switch (event->messageType)
switch (event->type)
{
case TR_TRACKER_PEERS:
case tr_tracker_event::Type::Peers:
tr_logAddTraceTor(tor, fmt::format("Got {} peers from tracker", std::size(event->pex)));
tr_peerMgrAddPex(tor, TR_PEER_FROM_TRACKER, std::data(event->pex), std::size(event->pex));
break;
case TR_TRACKER_COUNTS:
case tr_tracker_event::Type::Counts:
if (tor->isPrivate() && (event->leechers == 0))
{
tr_peerMgrSetSwarmIsAllSeeds(tor);
@ -477,20 +477,20 @@ static void onTrackerResponse(tr_torrent* tor, tr_tracker_event const* event, vo
break;
case TR_TRACKER_WARNING:
case tr_tracker_event::Type::Warning:
tr_logAddWarnTor(tor, fmt::format(_("Tracker warning: '{warning}'"), fmt::arg("warning", event->text)));
tor->error = TR_STAT_TRACKER_WARNING;
tor->error_announce_url = event->announce_url;
tor->error_string = event->text;
break;
case TR_TRACKER_ERROR:
case tr_tracker_event::Type::Error:
tor->error = TR_STAT_TRACKER_ERROR;
tor->error_announce_url = event->announce_url;
tor->error_string = event->text;
break;
case TR_TRACKER_ERROR_CLEAR:
case tr_tracker_event::Type::ErrorClear:
if (tor->error != TR_STAT_LOCAL_ERROR)
{
tr_torrentClearError(tor);
@ -762,7 +762,7 @@ static void torrentInit(tr_torrent* tor, tr_ctor const* ctor)
}
}
tor->torrent_announcer = tr_announcerAddTorrent(tor, onTrackerResponse, nullptr);
tor->torrent_announcer = session->announcer_->addTorrent(tor, onTrackerResponse, nullptr);
if (is_new_torrent)
{
@ -1263,7 +1263,7 @@ static void freeTorrent(tr_torrent* tor)
tr_peerMgrRemoveTorrent(tor);
tr_announcerRemoveTorrent(session->announcer, tor);
session->announcer_->removeTorrent(tor);
session->torrents().remove(tor, tr_time());
@ -1311,7 +1311,7 @@ static void torrentStartImpl(tr_torrent* const tor)
tor->finishedSeedingByIdle = false;
tr_torrentResetTransferStats(tor);
tr_announcerTorrentStarted(tor);
tor->session->announcer_->startTorrent(tor);
tor->lpdAnnounceAt = now;
tr_peerMgrStartTorrent(tor);
}
@ -1495,7 +1495,7 @@ static void stopTorrent(tr_torrent* const tor)
tor->session->verifyRemove(tor);
tr_peerMgrStopTorrent(tor);
tr_announcerTorrentStopped(tor);
tor->session->announcer_->stopTorrent(tor);
tor->session->closeTorrentFiles(tor);
@ -1991,7 +1991,7 @@ bool tr_torrent::setTrackerList(std::string_view text)
}
/* tell the announcer to reload this torrent's tracker list */
tr_announcerResetTorrent(this->session->announcer, this);
this->session->announcer_->resetTorrent(this);
return true;
}

View File

@ -317,11 +317,7 @@ TEST_F(AnnouncerUdpTest, canScrape)
// tell announcer to scrape
auto [request, expected_response] = buildSimpleScrapeRequestAndResponse();
auto response = std::optional<tr_scrape_response>{};
announcer->scrape(
request,
[](tr_scrape_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_scrape_response>*>(vresponse) = *resp; },
&response);
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
EXPECT_FALSE(announcer->isIdle());
// The announcer should have sent a UDP connection request.
@ -362,11 +358,7 @@ TEST_F(AnnouncerUdpTest, canScrape)
// Since the timestamp hasn't changed, the connection should be good
// and announcer-udp should skip the `connect` step, going straight to the scrape.
response.reset();
announcer->scrape(
request,
[](tr_scrape_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_scrape_response>*>(vresponse) = *resp; },
&response);
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
// The announcer should have sent a UDP connection request.
// Inspect that request for validity.
@ -384,11 +376,7 @@ TEST_F(AnnouncerUdpTest, canDestructCleanlyEvenWhenBusy)
// tell announcer to scrape
auto [request, expected_response] = buildSimpleScrapeRequestAndResponse();
auto response = std::optional<tr_scrape_response>{};
announcer->scrape(
request,
[](tr_scrape_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_scrape_response>*>(vresponse) = *resp; },
&response);
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
// The announcer should have sent a UDP connection request.
// Inspect that request for validity.
@ -417,11 +405,7 @@ TEST_F(AnnouncerUdpTest, canMultiScrape)
auto request = buildScrapeRequestFromResponse(expected_response);
auto response = std::optional<tr_scrape_response>{};
announcer->scrape(
request,
[](tr_scrape_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_scrape_response>*>(vresponse) = *resp; },
&response);
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
// Announcer will request a connection. Verify and grant the request
auto sent = waitForAnnouncerToSendMessage(mediator);
@ -480,11 +464,7 @@ TEST_F(AnnouncerUdpTest, canHandleScrapeError)
// tell announcer to scrape
auto response = std::optional<tr_scrape_response>{};
announcer->scrape(
request,
[](tr_scrape_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_scrape_response>*>(vresponse) = *resp; },
&response);
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
// The announcer should have sent a UDP connection request.
// Inspect that request for validity.
@ -532,9 +512,7 @@ TEST_F(AnnouncerUdpTest, canHandleConnectError)
auto response = std::optional<tr_scrape_response>{};
announcer->scrape(
buildScrapeRequestFromResponse(expected_response),
[](tr_scrape_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_scrape_response>*>(vresponse) = *resp; },
&response);
[&response](tr_scrape_response const& resp) { response = resp; });
// The announcer should have sent a UDP connection request.
// Inspect that request for validity.
@ -564,11 +542,7 @@ TEST_F(AnnouncerUdpTest, handleMessageReturnsFalseOnInvalidMessage)
// tell the announcer to scrape
auto response = std::optional<tr_scrape_response>{};
announcer->scrape(
request,
[](tr_scrape_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_scrape_response>*>(vresponse) = *resp; },
&response);
announcer->scrape(request, [&response](tr_scrape_response const& resp) { response = resp; });
// The announcer should have sent a UDP connection request.
// Inspect that request for validity.
@ -649,11 +623,7 @@ TEST_F(AnnouncerUdpTest, canAnnounce)
auto upkeep_timer = createUpkeepTimer(mediator, announcer);
auto response = std::optional<tr_announce_response>{};
announcer->announce(
request,
[](tr_announce_response const* resp, void* vresponse)
{ *static_cast<std::optional<tr_announce_response>*>(vresponse) = *resp; },
&response);
announcer->announce(request, [&response](tr_announce_response const& resp) { response = resp; });
// Announcer will request a connection. Verify and grant the request
auto sent = waitForAnnouncerToSendMessage(mediator);