perf: prioritize announces based on scrape stats (#1782)

* perf: prioritize announces based on scrape stats

This change is to improve responsiveness when starting a large batch of
torrents, e.g. when starting up a seedbox, but is also generally useful.

First, try to scrape as many torrents as as quickly as possible, giving
preference to that even over announces. Thanks to multiscrape we can run
through scrapes an order of magnitude faster than announces. Then scrape
responses tell us which swarms have leechers and we can prioritize those
torrents' announces.

Second, increase how many scrapes and announces we start at a time. This
limit probably hasn't been needed since 2da97b25 and by scraping faster,
the sooner we'll know which torrents to announce first.

* chore: remove printf() tracer
This commit is contained in:
Charles Kerr 2021-09-09 19:28:22 -05:00 committed by GitHub
parent d6cb99e57c
commit 756bb349d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 82 deletions

View File

@ -30,13 +30,13 @@ enum
* This is only an upper bound: if the tracker complains about
* length, announcer will incrementally lower the batch size.
*/
TR_MULTISCRAPE_MAX = 100
TR_MULTISCRAPE_MAX = 60
};
typedef struct
{
/* the scrape URL */
char* url;
char const* url;
/* the name to use when deep logging is enabled */
char log_name[128];
@ -113,10 +113,13 @@ void tr_tracker_udp_scrape(
typedef enum
{
/* Note: the ordering of this enum's values is important to
* announcer.c's tr_tier.announce_event_priority. If changing
* the enum, ensure announcer.c is compatible with the change. */
TR_ANNOUNCE_EVENT_NONE,
TR_ANNOUNCE_EVENT_COMPLETED,
TR_ANNOUNCE_EVENT_STARTED,
TR_ANNOUNCE_EVENT_STOPPED
TR_ANNOUNCE_EVENT_COMPLETED,
TR_ANNOUNCE_EVENT_STOPPED,
} tr_announce_event;
char const* tr_announce_event_get_string(tr_announce_event);

View File

@ -51,12 +51,14 @@ enum
DEFAULT_ANNOUNCE_INTERVAL_SEC = (60 * 10),
/* unless the tracker says otherwise, this is the announce min_interval */
DEFAULT_ANNOUNCE_MIN_INTERVAL_SEC = (60 * 2),
/* how many web tasks we allow at one time */
MAX_CONCURRENT_TASKS = 48,
/* the value of the 'numwant' argument passed in tracker requests. */
NUMWANT = 80,
/* */
UPKEEP_INTERVAL_SECS = 1,
/* how often to announce & scrape */
UPKEEP_INTERVAL_MSEC = 500,
MAX_ANNOUNCES_PER_UPKEEP = 20,
MAX_SCRAPES_PER_UPKEEP = 20,
/* this is how often to call the UDP tracker upkeep */
TAU_UPKEEP_INTERVAL_SECS = 5,
@ -172,7 +174,6 @@ typedef struct tr_announcer
tr_session* session;
struct event* upkeepTimer;
int slotsAvailable;
int key;
time_t tauUpkeepAt;
} tr_announcer;
@ -212,9 +213,8 @@ void tr_announcerInit(tr_session* session)
a->stops = TR_PTR_ARRAY_INIT;
a->key = tr_rand_int(INT_MAX);
a->session = session;
a->slotsAvailable = MAX_CONCURRENT_TASKS;
a->upkeepTimer = evtimer_new(session->event_base, onUpkeepTimer, a);
tr_timerAdd(a->upkeepTimer, UPKEEP_INTERVAL_SECS, 0);
tr_timerAddMsec(a->upkeepTimer, UPKEEP_INTERVAL_MSEC);
session->announcer = a;
}
@ -331,6 +331,7 @@ typedef struct tr_tier
bool lastAnnounceTimedOut;
tr_announce_event* announce_events;
int announce_event_priority;
int announce_event_count;
int announce_event_alloc;
@ -389,7 +390,7 @@ static void tierConstruct(tr_tier* tier, tr_torrent* tor)
tier->scrapeIntervalSec = DEFAULT_SCRAPE_INTERVAL_SEC;
tier->announceIntervalSec = DEFAULT_ANNOUNCE_INTERVAL_SEC;
tier->announceMinIntervalSec = DEFAULT_ANNOUNCE_MIN_INTERVAL_SEC;
tier->scrapeAt = get_next_scrape_time(tor->session, tier, tr_rand_int_weak(180));
tier->scrapeAt = get_next_scrape_time(tor->session, tier, 0);
tier->tor = tor;
}
@ -832,12 +833,27 @@ static void dbgmsg_tier_announce_queue(tr_tier const* tier)
}
}
// higher priorities go to the front of the announce queue
static void tier_update_announce_priority(tr_tier* tier)
{
int priority = -1;
for (int i = 0; i < tier->announce_event_count; ++i)
{
priority = MAX(priority, (int)tier->announce_events[i]);
}
tier->announce_event_priority = priority;
}
static void tier_announce_remove_trailing(tr_tier* tier, tr_announce_event e)
{
while (tier->announce_event_count > 0 && tier->announce_events[tier->announce_event_count - 1] == e)
{
--tier->announce_event_count;
}
tier_update_announce_priority(tier);
}
static void tier_announce_event_push(tr_tier* tier, tr_announce_event e, time_t announceAt)
@ -866,6 +882,7 @@ static void tier_announce_event_push(tr_tier* tier, tr_announce_event e, time_t
if (has_completed)
{
tier->announce_events[tier->announce_event_count++] = c;
tier_update_announce_priority(tier);
}
}
@ -884,8 +901,9 @@ static void tier_announce_event_push(tr_tier* tier, tr_announce_event e, time_t
}
/* add it */
tier->announce_events[tier->announce_event_count++] = e;
tier->announceAt = announceAt;
tier->announce_events[tier->announce_event_count++] = e;
tier_update_announce_priority(tier);
dbgmsg_tier_announce_queue(tier);
dbgmsg(tier, "announcing in %d seconds", (int)difftime(announceAt, tr_time()));
@ -897,6 +915,7 @@ static tr_announce_event tier_announce_event_pull(tr_tier* tier)
tr_removeElementFromArray(tier->announce_events, 0, sizeof(tr_announce_event), tier->announce_event_count);
--tier->announce_event_count;
tier_update_announce_priority(tier);
return e;
}
@ -1086,11 +1105,6 @@ static void on_announce_done(tr_announce_response const* response, void* vdata)
time_t const now = tr_time();
tr_announce_event const event = data->event;
if (announcer != NULL)
{
++announcer->slotsAvailable;
}
if (tier != NULL)
{
tr_tracker* tracker;
@ -1330,7 +1344,6 @@ static void tierAnnounce(tr_announcer* announcer, tr_tier* tier)
tier->isAnnouncing = true;
tier->lastAnnounceStartTime = now;
--announcer->slotsAvailable;
announce_request_delegate(announcer, req, on_announce_done, data);
}
@ -1467,14 +1480,13 @@ static void on_scrape_done(tr_scrape_response const* response, void* vsession)
}
else
{
tr_tracker* tracker;
tier->lastScrapeSucceeded = true;
tier->scrapeIntervalSec = MAX(DEFAULT_SCRAPE_INTERVAL_SEC, response->min_request_interval);
tier->scrapeAt = get_next_scrape_time(session, tier, tier->scrapeIntervalSec);
tr_logAddTorDbg(tier->tor, "Scrape successful. Rescraping in %d seconds.", tier->scrapeIntervalSec);
if ((tracker = tier->currentTracker) != NULL)
tr_tracker* const tracker = tier->currentTracker;
if (tracker != NULL)
{
if (row->seeders >= 0)
{
@ -1539,11 +1551,6 @@ static void on_scrape_done(tr_scrape_response const* response, void* vsession)
}
}
}
if (announcer != NULL)
{
++announcer->slotsAvailable;
}
}
static void scrape_request_delegate(
@ -1570,14 +1577,13 @@ static void scrape_request_delegate(
static void multiscrape(tr_announcer* announcer, tr_ptrArray* tiers)
{
int request_count = 0;
size_t request_count = 0;
time_t const now = tr_time();
int const tier_count = tr_ptrArraySize(tiers);
int const max_request_count = MIN(announcer->slotsAvailable, tier_count);
tr_scrape_request* requests = tr_new0(tr_scrape_request, max_request_count);
size_t const tier_count = tr_ptrArraySize(tiers);
tr_scrape_request requests[MAX_SCRAPES_PER_UPKEEP] = { 0 };
/* batch as many info_hashes into a request as we can */
for (int i = 0; i < tier_count; ++i)
for (size_t i = 0; i < tier_count; ++i)
{
tr_tier* tier = tr_ptrArrayNth(tiers, i);
struct tr_scrape_info* const scrape_info = tier->currentTracker->scrape_info;
@ -1587,7 +1593,7 @@ static void multiscrape(tr_announcer* announcer, tr_ptrArray* tiers)
TR_ASSERT(scrape_info != NULL);
/* if there's a request with this scrape URL and a free slot, use it */
for (int j = 0; !found && j < request_count; ++j)
for (size_t j = 0; !found && j < request_count; ++j)
{
tr_scrape_request* req = &requests[j];
@ -1608,7 +1614,7 @@ static void multiscrape(tr_announcer* announcer, tr_ptrArray* tiers)
}
/* otherwise, if there's room for another request, build a new one */
if (!found && request_count < max_request_count)
if (!found && request_count < MAX_SCRAPES_PER_UPKEEP)
{
tr_scrape_request* req = &requests[request_count++];
req->url = scrape_info->url;
@ -1621,13 +1627,10 @@ static void multiscrape(tr_announcer* announcer, tr_ptrArray* tiers)
}
/* send the requests we just built */
for (int i = 0; i < request_count; ++i)
for (size_t i = 0; i < request_count; ++i)
{
scrape_request_delegate(announcer, &requests[i], on_scrape_done, announcer->session);
}
/* cleanup */
tr_free(requests);
}
static void flushCloseMessages(tr_announcer* announcer)
@ -1640,58 +1643,82 @@ static void flushCloseMessages(tr_announcer* announcer)
tr_ptrArrayClear(&announcer->stops);
}
static bool tierNeedsToAnnounce(tr_tier const* tier, time_t const now)
static inline bool tierNeedsToAnnounce(tr_tier const* tier, time_t const now)
{
return !tier->isAnnouncing && !tier->isScraping && tier->announceAt != 0 && tier->announceAt <= now &&
tier->announce_event_count > 0;
}
static bool tierNeedsToScrape(tr_tier const* tier, time_t const now)
static inline bool tierNeedsToScrape(tr_tier const* tier, time_t const now)
{
return !tier->isScraping && tier->scrapeAt != 0 && tier->scrapeAt <= now && tier->currentTracker != NULL &&
tier->currentTracker->scrape_info != NULL;
}
static int compareTiers(void const* va, void const* vb)
static inline int countDownloaders(tr_tier const* tier)
{
int ret;
tr_tier const* a = *(tr_tier const* const*)va;
tr_tier const* b = *(tr_tier const* const*)vb;
tr_tracker const* const tracker = tier->currentTracker;
/* primary key: larger stats come before smaller */
ret = compareTransfer(
return tracker == NULL ? 0 : tracker->downloaderCount + tracker->leecherCount;
}
static int compareAnnounceTiers(void const* va, void const* vb)
{
tr_tier const* a = (tr_tier const*)va;
tr_tier const* b = (tr_tier const*)vb;
/* prefer higher-priority events */
int const priority_a = a->announce_event_priority;
int const priority_b = b->announce_event_priority;
if (priority_a != priority_b)
{
return priority_a > priority_b ? -1 : 1;
}
/* prefer swarms where we might upload */
int const downloader_count_a = countDownloaders(a);
int const downloader_count_b = countDownloaders(b);
if (downloader_count_a != downloader_count_b)
{
return downloader_count_a > downloader_count_b ? -1 : 1;
}
/* prefer swarms where we might download */
bool const is_seed_a = tr_torrentIsSeed(a->tor);
bool const is_seed_b = tr_torrentIsSeed(b->tor);
if (is_seed_a != is_seed_b)
{
return is_seed_a ? 1 : -1;
}
/* prefer larger stats, to help ensure stats get recorded when stopping on shutdown */
int const xfer = compareTransfer(
a->byteCounts[TR_ANN_UP],
a->byteCounts[TR_ANN_DOWN],
b->byteCounts[TR_ANN_UP],
b->byteCounts[TR_ANN_DOWN]);
/* secondary key: announcements that have been waiting longer go first */
if (ret == 0 && a->announceAt != b->announceAt)
if (xfer)
{
ret = a->announceAt < b->announceAt ? -1 : 1;
return xfer;
}
return ret;
// announcements that have been waiting longer go first
if (a->announceAt != b->announceAt)
{
return a->announceAt < b->announceAt ? -1 : 1;
}
return 0;
}
static void announceMore(tr_announcer* announcer)
static void scrapeAndAnnounceMore(tr_announcer* announcer)
{
int n;
tr_torrent* tor;
tr_ptrArray announceMe = TR_PTR_ARRAY_INIT;
tr_ptrArray scrapeMe = TR_PTR_ARRAY_INIT;
time_t const now = tr_time();
dbgmsg(NULL, "announceMore: slotsAvailable is %d", announcer->slotsAvailable);
if (announcer->slotsAvailable < 1)
{
return;
}
/* build a list of tiers that need to be announced */
tor = NULL;
tr_ptrArray announceMe = TR_PTR_ARRAY_INIT;
tr_ptrArray scrapeMe = TR_PTR_ARRAY_INIT;
tr_torrent* tor = NULL;
while ((tor = tr_torrentNext(announcer->session, tor)) != NULL)
{
struct tr_torrent_tiers* tt = tor->tiers;
@ -1702,25 +1729,25 @@ static void announceMore(tr_announcer* announcer)
if (tierNeedsToAnnounce(tier, now))
{
tr_ptrArrayAppend(&announceMe, tier);
tr_ptrArrayInsertSorted(&announceMe, tier, compareAnnounceTiers);
}
else if (tierNeedsToScrape(tier, now))
if (tierNeedsToScrape(tier, now))
{
tr_ptrArrayAppend(&scrapeMe, tier);
}
}
}
n = tr_ptrArraySize(&announceMe);
/* First, scrape what we can. We handle scrapes first because
* we can work through that queue much faster than announces
* (thanks to multiscrape) _and_ the scrape responses will tell
* us which swarms are interesting and should be announced next. */
multiscrape(announcer, &scrapeMe);
/* if there are more tiers than slots available, prioritize */
if (n > announcer->slotsAvailable)
{
qsort(tr_ptrArrayBase(&announceMe), n, sizeof(tr_tier*), compareTiers);
n = announcer->slotsAvailable;
}
/* announce some */
/* Second, announce what we can. If there aren't enough slots
* available, use compareAnnounceTiers to prioritize. */
int n = MIN(tr_ptrArraySize(&announceMe), MAX_ANNOUNCES_PER_UPKEEP);
for (int i = 0; i < n; ++i)
{
tr_tier* tier = tr_ptrArrayNth(&announceMe, i);
@ -1729,9 +1756,6 @@ static void announceMore(tr_announcer* announcer)
tierAnnounce(announcer, tier);
}
/* scrape some */
multiscrape(announcer, &scrapeMe);
/* cleanup */
tr_ptrArrayDestruct(&scrapeMe, NULL);
tr_ptrArrayDestruct(&announceMe, NULL);
@ -1752,10 +1776,10 @@ static void onUpkeepTimer(evutil_socket_t fd, short what, void* vannouncer)
/* maybe send out some "stopped" messages for closed torrents */
flushCloseMessages(announcer);
/* maybe send out some announcements to trackers */
/* maybe kick off some scrapes / announces whose time has come */
if (!is_closing)
{
announceMore(announcer);
scrapeAndAnnounceMore(announcer);
}
/* TAU upkeep */
@ -1766,7 +1790,7 @@ static void onUpkeepTimer(evutil_socket_t fd, short what, void* vannouncer)
}
/* set up the next timer */
tr_timerAdd(announcer->upkeepTimer, UPKEEP_INTERVAL_SECS, 0);
tr_timerAddMsec(announcer->upkeepTimer, UPKEEP_INTERVAL_MSEC);
tr_sessionUnlock(session);
}
@ -1915,6 +1939,7 @@ static void copy_tier_attributes_impl(struct tr_tier* tgt, int trackerIndex, tr_
tgt->trackers = keep.trackers;
tgt->tracker_count = keep.tracker_count;
tgt->announce_events = tr_memdup(src->announce_events, sizeof(tr_announce_event) * src->announce_event_count);
tgt->announce_event_priority = src->announce_event_priority;
tgt->announce_event_count = src->announce_event_count;
tgt->announce_event_alloc = src->announce_event_count;
tgt->currentTrackerIndex = trackerIndex;