From 827dc86bb4cc69aeceb00c4f364165acef4eaed3 Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Thu, 29 Nov 2007 00:43:58 +0000 Subject: [PATCH] rewrite the tracker code. this should improve and/or fix a number of bugs, including "too many open files", "router death", "slow internet", and the mutex release crash. --- libtransmission/internal.h | 1 + libtransmission/tracker.c | 1006 +++++++++++++++++--------------- libtransmission/transmission.c | 4 +- libtransmission/trevent.c | 28 - libtransmission/trevent.h | 6 - 5 files changed, 532 insertions(+), 513 deletions(-) diff --git a/libtransmission/internal.h b/libtransmission/internal.h index 5df62e46c..8ce58008f 100644 --- a/libtransmission/internal.h +++ b/libtransmission/internal.h @@ -195,6 +195,7 @@ struct tr_handle uint8_t isClosed; struct tr_stats_handle * sessionStats; + struct tr_tracker_handle * tracker; }; void tr_globalLock ( struct tr_handle * ); diff --git a/libtransmission/tracker.c b/libtransmission/tracker.c index 199b043ad..b619dee35 100644 --- a/libtransmission/tracker.c +++ b/libtransmission/tracker.c @@ -24,6 +24,7 @@ #include "transmission.h" #include "bencode.h" #include "completion.h" +#include "list.h" #include "net.h" #include "publish.h" #include "shared.h" @@ -33,6 +34,12 @@ enum { + /* seconds between tracker pulses */ + PULSE_INTERVAL_SEC = 1, + + /* maximum number of concurrent tracker socket connections */ + MAX_TRACKER_SOCKETS = 16, + /* unless the tracker says otherwise, rescrape this frequently */ DEFAULT_SCRAPE_INTERVAL_SEC = (60 * 15), @@ -60,6 +67,15 @@ enum *** **/ +enum +{ + TR_REQ_STARTED, + TR_REQ_COMPLETED, + TR_REQ_STOPPED, + TR_REQ_REANNOUNCE, + TR_REQ_COUNT +}; + struct tr_tracker { tr_handle * handle; @@ -106,11 +122,10 @@ struct tr_tracker char * lastRequest; time_t manualAnnounceAllowedAt; + time_t reannounceAt; + time_t scrapeAt; - tr_timer * scrapeTimer; - tr_timer * reannounceTimer; - - unsigned int isRunning : 1; + unsigned int isRunning : 1; }; /** @@ -144,33 +159,55 @@ myDebug( const char * file, int line, const tr_tracker * t, const char * fmt, .. #define dbgmsg(t, fmt...) myDebug(__FILE__, __LINE__, t, ##fmt ) - /*** -**** Connections that know how to clean up after themselves +**** ***/ +static tr_tracker_info * +getCurrentAddress( const tr_tracker * t ) +{ + assert( t->addresses != NULL ); + assert( t->addressIndex >= 0 ); + assert( t->addressIndex < t->addressCount ); + + return t->redirect ? t->redirect + : t->addresses + t->addressIndex; +} + static int -freeConnection( void * evcon ) +trackerSupportsScrape( const tr_tracker * t ) { - evhttp_connection_free( evcon ); - return FALSE; + const tr_tracker_info * info = getCurrentAddress( t ); + + return ( info != NULL ) + && ( info->scrape != NULL ) + && ( info->scrape[0] != '\0' ); } -static void -connectionClosedCB( struct evhttp_connection * evcon, void * handle ) +/*** +**** +***/ + +struct torrent_hash { - /* libevent references evcon right after calling this function, - so we can't free it yet... defer it to after this call chain - has played out */ - tr_timerNew( handle, freeConnection, evcon, 100 ); + tr_handle * handle; + uint8_t hash[SHA_DIGEST_LENGTH]; +}; + +static struct torrent_hash* +torrentHashNew( tr_handle * handle, const tr_tracker * t ) +{ + struct torrent_hash * data = tr_new( struct torrent_hash, 1 ); + data->handle = handle; + memcpy( data->hash, t->hash, SHA_DIGEST_LENGTH ); + return data; } -static struct evhttp_connection* -getConnection( tr_tracker * t, const char * address, int port ) +tr_tracker * +findTrackerFromHash( struct torrent_hash * data ) { - struct evhttp_connection * c = evhttp_connection_new( address, port ); - evhttp_connection_set_closecb( c, connectionClosedCB, t->handle ); - return c; + tr_torrent * torrent = tr_torrentFindFromHash( data->handle, data->hash ); + return torrent ? torrent->tracker : NULL; } /*** @@ -220,135 +257,15 @@ publishNewPeers( tr_tracker * t, int count, uint8_t * peers ) } /*** -**** LIFE CYCLE +**** ***/ static void -generateKeyParam( char * msg, int len ) +onStoppedResponse( struct evhttp_request * req UNUSED, void * handle UNUSED ) { - int i; - const char * pool = "abcdefghijklmnopqrstuvwxyz0123456789"; - const int poolSize = strlen( pool ); - for( i=0; iinfo; - int i, j, sum, *iwalk; - tr_tracker_info * nwalk; - tr_tracker * t; - - tr_dbg( "making a new tracker for \"%s\"", info->primaryAddress ); - - t = tr_new0( tr_tracker, 1 ); - t->handle = torrent->handle; - t->scrapeIntervalSec = DEFAULT_SCRAPE_INTERVAL_SEC; - t->announceIntervalSec = DEFAULT_ANNOUNCE_INTERVAL_SEC; - t->announceMinIntervalSec = DEFAULT_ANNOUNCE_MIN_INTERVAL_SEC; - generateKeyParam( t->key_param, KEYLEN ); - - t->publisher = tr_publisherNew( ); - t->timesDownloaded = -1; - t->seederCount = -1; - t->leecherCount = -1; - t->manualAnnounceAllowedAt = ~(time_t)0; - t->name = tr_strdup( info->name ); - memcpy( t->hash, info->hash, SHA_DIGEST_LENGTH ); - escape( t->escaped, info->hash, SHA_DIGEST_LENGTH ); - - for( sum=i=0; itrackerTiers; ++i ) - sum += info->trackerList[i].count; - t->addresses = nwalk = tr_new0( tr_tracker_info, sum ); - t->addressIndex = 0; - t->addressCount = sum; - t->tierFronts = iwalk = tr_new0( int, sum ); - - for( i=0; itrackerTiers; ++i ) - { - const int tierFront = nwalk - t->addresses; - - for( j=0; jtrackerList[i].count; ++j ) - { - const tr_tracker_info * src = &info->trackerList[i].list[j]; - nwalk->address = tr_strdup( src->address ); - nwalk->port = src->port; - nwalk->announce = tr_strdup( src->announce ); - nwalk->scrape = tr_strdup( src->scrape ); - ++nwalk; - - *iwalk++ = tierFront; - } - } - - assert( nwalk - t->addresses == sum ); - assert( iwalk - t->tierFronts == sum ); - - /* scrape sometime in the next two minutes. - scrapes are staggered out like this to prevent - hundreds of scrapes from going out at the same time */ - t->scrapeTimer = tr_timerNew( t->handle, - onScrapeNow, t, - tr_rand(120)*1000 ); - - return t; -} - -static void -onTrackerFreeNow( void * vt ) -{ - int i; - tr_tracker * t = vt; - - tr_timerFree( &t->scrapeTimer ); - tr_timerFree( &t->reannounceTimer ); - tr_publisherFree( &t->publisher ); - tr_free( t->name ); - tr_free( t->trackerID ); - tr_free( t->lastRequest ); - - /* addresses... */ - for( i=0; iaddressCount; ++i ) - tr_trackerInfoClear( &t->addresses[i] ); - tr_free( t->addresses ); - tr_free( t->tierFronts ); - - /* redirect... */ - if( t->redirect ) { - tr_trackerInfoClear( t->redirect ); - tr_free( t->redirect ); - } - - tr_free( t ); -} - -void -tr_trackerFree( tr_tracker * t ) -{ - tr_runInEventThread( t->handle, onTrackerFreeNow, t ); -} - -/*** -**** UTIL -***/ - static int parseBencResponse( struct evhttp_request * req, benc_val_t * setme ) { @@ -426,237 +343,6 @@ updateAddresses( tr_tracker * t, const struct evhttp_request * req ) return ret; } -static tr_tracker_info * -getCurrentAddress( const tr_tracker * t ) -{ - assert( t->addresses != NULL ); - assert( t->addressIndex >= 0 ); - assert( t->addressIndex < t->addressCount ); - - return t->redirect ? t->redirect - : t->addresses + t->addressIndex; -} -static int -trackerSupportsScrape( const tr_tracker * t ) -{ - const tr_tracker_info * info = getCurrentAddress( t ); - - return ( info != NULL ) - && ( info->scrape != NULL ) - && ( info->scrape[0] != '\0' ); -} - - -static void -addCommonHeaders( const tr_tracker * t, - struct evhttp_request * req ) -{ - char buf[1024]; - const tr_tracker_info * address = getCurrentAddress( t ); - snprintf( buf, sizeof(buf), "%s:%d", address->address, address->port ); - evhttp_add_header( req->output_headers, "Host", buf ); - evhttp_add_header( req->output_headers, "Connection", "close" ); - evhttp_add_header( req->output_headers, "Content-Length", "0" ); - evhttp_add_header( req->output_headers, "User-Agent", - TR_NAME "/" LONG_VERSION_STRING ); -} - -/** -*** -**/ - -struct torrent_hash -{ - tr_handle * handle; - uint8_t hash[SHA_DIGEST_LENGTH]; -}; - -static struct torrent_hash* -torrentHashNew( tr_tracker * t ) -{ - struct torrent_hash * data = tr_new( struct torrent_hash, 1 ); - data->handle = t->handle; - memcpy( data->hash, t->hash, SHA_DIGEST_LENGTH ); - return data; -} - -tr_tracker * -findTrackerFromHash( struct torrent_hash * data ) -{ - tr_torrent * torrent = tr_torrentFindFromHash( data->handle, data->hash ); - return torrent ? torrent->tracker : NULL; -} - -/*** -**** -**** SCRAPE -**** -***/ - -static void -onScrapeResponse( struct evhttp_request * req, void * vhash ) -{ - const char * warning; - time_t nextScrapeSec = 60; - tr_tracker * t; - - t = findTrackerFromHash( vhash ); - tr_free( vhash ); - if( t == NULL ) /* tracker's been closed... */ - return; - - tr_inf( "Got scrape response for '%s': %s", - t->name, - ( ( req && req->response_code_line ) ? req->response_code_line - : "(null)") ); - - if( req && ( req->response_code == HTTP_OK ) ) - { - benc_val_t benc, *files; - const int bencLoaded = !parseBencResponse( req, &benc ); - - if( bencLoaded - && (( files = tr_bencDictFind( &benc, "files" ) )) - && ( files->type == TYPE_DICT ) ) - { - int i; - for( i=0; ival.l.count; i+=2 ) - { - const uint8_t* hash = - (const uint8_t*) files->val.l.vals[i].val.s.s; - benc_val_t *tmp, *flags; - benc_val_t *tordict = &files->val.l.vals[i+1]; - if( memcmp( t->hash, hash, SHA_DIGEST_LENGTH ) ) - continue; - - publishErrorClear( t ); - - if(( tmp = tr_bencDictFind( tordict, "complete" ))) - t->seederCount = tmp->val.i; - - if(( tmp = tr_bencDictFind( tordict, "incomplete" ))) - t->leecherCount = tmp->val.i; - - if(( tmp = tr_bencDictFind( tordict, "downloaded" ))) - t->timesDownloaded = tmp->val.i; - - if(( flags = tr_bencDictFind( tordict, "flags" ))) - if(( tmp = tr_bencDictFind( flags, "min_request_interval"))) - t->scrapeIntervalSec = tmp->val.i; - - tr_dbg( "Torrent '%s' scrape successful." - " Rescraping in %d seconds", - t->name, t->scrapeIntervalSec ); - - nextScrapeSec = t->scrapeIntervalSec; - } - } - - if( bencLoaded ) - tr_bencFree( &benc ); - } - - if (( warning = updateAddresses( t, req ) )) { - tr_err( warning ); - publishWarning( t, warning ); - } - - tr_timerFree( &t->scrapeTimer ); - - t->scrapeTimer = tr_timerNew( t->handle, - onScrapeNow, t, - nextScrapeSec*1000 ); -} - -static int -onScrapeNow( void * vt ) -{ - tr_tracker * t = vt; - const tr_tracker_info * address = getCurrentAddress( t ); - - if( trackerSupportsScrape( t ) ) - { - char * uri; - struct evhttp_connection * evcon; - struct evhttp_request *req; - struct evbuffer * buf = evbuffer_new( ); - - evbuffer_add_printf( buf, "%s%sinfo_hash=%s", - address->scrape, - ( strchr(address->scrape, '?') == NULL ? "?" : "&" ), - t->escaped ); - uri = tr_strdup( (char*) EVBUFFER_DATA( buf ) ); - evbuffer_free( buf ); - - tr_inf( "Sending scrape to tracker %s:%d: %s", - address->address, address->port, uri ); - - evcon = getConnection( t, address->address, address->port ); - evhttp_connection_set_timeout( evcon, TIMEOUT_INTERVAL_SEC ); - req = evhttp_request_new( onScrapeResponse, torrentHashNew( t ) ); - addCommonHeaders( t, req ); - tr_evhttp_make_request( t->handle, evcon, req, EVHTTP_REQ_GET, uri ); - } - - t->scrapeTimer = NULL; - return FALSE; -} - -/*** -**** -**** TRACKER REQUESTS -**** -***/ - -static char* -buildTrackerRequestURI( const tr_tracker * t, - const tr_torrent * torrent, - const char * eventName ) -{ - const int isStopping = !strcmp( eventName, "stopped" ); - const int numwant = isStopping ? 0 : NUMWANT; - struct evbuffer * buf = evbuffer_new( ); - char * ret; - - char * ann = getCurrentAddress(t)->announce; - - evbuffer_add_printf( buf, "%s" - "%cinfo_hash=%s" - "&peer_id=%s" - "&port=%d" - "&uploaded=%"PRIu64 - "&downloaded=%"PRIu64 - "&corrupt=%"PRIu64 - "&left=%"PRIu64 - "&compact=1" - "&numwant=%d" - "&key=%s" - "&supportcrypto=1" - "&requirecrypto=%d" - "%s%s" - "%s%s", - ann, - strchr(ann, '?') ? '&' : '?', - t->escaped, - t->peer_id, - tr_sharedGetPublicPort( t->handle->shared ), - torrent->uploadedCur, - torrent->downloadedCur, - torrent->corruptCur, - tr_cpLeftUntilComplete( torrent->completion ), - numwant, - t->key_param, - ( t->handle->encryptionMode==TR_ENCRYPTION_REQUIRED ? 1 : 0 ), - ( ( eventName && *eventName ) ? "&event=" : "" ), - ( ( eventName && *eventName ) ? eventName : "" ), - ( ( t->trackerID && *t->trackerID ) ? "&trackerid=" : "" ), - ( ( t->trackerID && *t->trackerID ) ? t->trackerID : "" ) ); - - ret = tr_strdup( (char*) EVBUFFER_DATA( buf ) ); - evbuffer_free( buf ); - return ret; -} - /* Convert to compact form */ static uint8_t * parseOldPeers( benc_val_t * bePeers, int * setmePeerCount ) @@ -696,15 +382,6 @@ parseOldPeers( benc_val_t * bePeers, int * setmePeerCount ) return compact; } -static int onRetry( void * vt ); -static int onReannounce( void * vt ); - -static void -onStoppedResponse( struct evhttp_request * req UNUSED, void * handle UNUSED ) -{ - dbgmsg( NULL, "got a response to some `stop' message" ); -} - static void onTrackerResponse( struct evhttp_request * req, void * torrent_hash ) { @@ -809,11 +486,8 @@ onTrackerResponse( struct evhttp_request * req, void * torrent_hash ) { dbgmsg( t, "request succeeded. reannouncing in %d seconds", t->announceIntervalSec ); - t->manualAnnounceAllowedAt = time(NULL) - + t->announceMinIntervalSec; - t->reannounceTimer = tr_timerNew( t->handle, - onReannounce, t, - t->announceIntervalSec * 1000 ); + t->reannounceAt = time(NULL) + t->announceIntervalSec; + t->manualAnnounceAllowedAt = time(NULL) + t->announceMinIntervalSec; } else if( 300<=responseCode && responseCode<=399 ) { @@ -821,7 +495,8 @@ onTrackerResponse( struct evhttp_request * req, void * torrent_hash ) /* it's a redirect... updateAddresses() has already * parsed the redirect, all that's left is to retry */ - onRetry( t ); + t->reannounceAt = time(NULL); + t->manualAnnounceAllowedAt = time(NULL) + t->announceMinIntervalSec; } else if( 400<=responseCode && responseCode<=499 ) { @@ -833,7 +508,7 @@ onTrackerResponse( struct evhttp_request * req, void * torrent_hash ) if( req && req->response_code_line ) publishErrorMessage( t, req->response_code_line ); t->manualAnnounceAllowedAt = ~(time_t)0; - t->reannounceTimer = NULL; + t->reannounceAt = 0; } else if( 500<=responseCode && responseCode<=599 ) { @@ -846,95 +521,495 @@ onTrackerResponse( struct evhttp_request * req, void * torrent_hash ) if( req && req->response_code_line ) publishWarning( t, req->response_code_line ); t->manualAnnounceAllowedAt = ~(time_t)0; - t->reannounceTimer = tr_timerNew( t->handle, onRetry, t, 15 * 1000 ); + t->reannounceAt = time(NULL) + 15; } else { - dbgmsg( t, "Invalid response from tracker... retrying in 120 seconds." ); + dbgmsg( t, "Invalid response from tracker... retrying in 60 seconds." ); /* WTF did we get?? */ if( req && req->response_code_line ) publishErrorMessage( t, req->response_code_line ); t->manualAnnounceAllowedAt = ~(time_t)0; - t->reannounceTimer = tr_timerNew( t->handle, onRetry, t, 120 * 1000 ); + t->reannounceAt = time(NULL) + 60; } } -static int -sendTrackerRequest( void * vt, const char * eventName ) +static void +onScrapeResponse( struct evhttp_request * req, void * vhash ) { - tr_tracker * t = vt; - const int isStopping = eventName && !strcmp( eventName, "stopped" ); - const tr_tracker_info * address = getCurrentAddress( t ); - char * uri; - struct evhttp_connection * evcon; - const tr_torrent * tor; + const char * warning; + time_t nextScrapeSec = 60; + tr_tracker * t; - tor = tr_torrentFindFromHash( t->handle, t->hash ); - if( tor == NULL ) - return FALSE; + t = findTrackerFromHash( vhash ); + tr_free( vhash ); + if( t == NULL ) /* tracker's been closed... */ + return; - uri = buildTrackerRequestURI( t, tor, eventName ); - - tr_inf( "Torrent \"%s\" sending '%s' to tracker %s:%d: %s", + tr_inf( "Got scrape response for '%s': %s", t->name, - (eventName ? eventName : "periodic announce"), - address->address, address->port, - uri ); + ( ( req && req->response_code_line ) ? req->response_code_line + : "(null)") ); - /* kill any pending requests */ - dbgmsg( t, "clearing announce timer" ); - tr_timerFree( &t->reannounceTimer ); + if( req && ( req->response_code == HTTP_OK ) ) + { + benc_val_t benc, *files; + const int bencLoaded = !parseBencResponse( req, &benc ); - evcon = getConnection( t, address->address, address->port ); - if ( !evcon ) { - tr_err( "Can't make a connection to %s:%d", address->address, address->port ); - tr_free( uri ); - } else { - struct evhttp_request * req; - if( eventName != t->lastRequest ) { - tr_free( t->lastRequest ); - t->lastRequest = tr_strdup( eventName ); + if( bencLoaded + && (( files = tr_bencDictFind( &benc, "files" ) )) + && ( files->type == TYPE_DICT ) ) + { + int i; + for( i=0; ival.l.count; i+=2 ) + { + const uint8_t* hash = + (const uint8_t*) files->val.l.vals[i].val.s.s; + benc_val_t *tmp, *flags; + benc_val_t *tordict = &files->val.l.vals[i+1]; + if( memcmp( t->hash, hash, SHA_DIGEST_LENGTH ) ) + continue; + + publishErrorClear( t ); + + if(( tmp = tr_bencDictFind( tordict, "complete" ))) + t->seederCount = tmp->val.i; + + if(( tmp = tr_bencDictFind( tordict, "incomplete" ))) + t->leecherCount = tmp->val.i; + + if(( tmp = tr_bencDictFind( tordict, "downloaded" ))) + t->timesDownloaded = tmp->val.i; + + if(( flags = tr_bencDictFind( tordict, "flags" ))) + if(( tmp = tr_bencDictFind( flags, "min_request_interval"))) + t->scrapeIntervalSec = tmp->val.i; + + tr_dbg( "Torrent '%s' scrape successful." + " Rescraping in %d seconds", + t->name, t->scrapeIntervalSec ); + + nextScrapeSec = t->scrapeIntervalSec; + } } - if( isStopping ) { - evhttp_connection_set_timeout( evcon, STOP_TIMEOUT_INTERVAL_SEC ); - req = evhttp_request_new( onStoppedResponse, t->handle ); - } else { - evhttp_connection_set_timeout( evcon, TIMEOUT_INTERVAL_SEC ); - req = evhttp_request_new( onTrackerResponse, torrentHashNew(t) ); - } - dbgmsg( t, "sending \"%s\" request to tracker", eventName ? eventName : "reannounce" ); - addCommonHeaders( t, req ); - tr_evhttp_make_request( t->handle, evcon, - req, EVHTTP_REQ_GET, uri ); + if( bencLoaded ) + tr_bencFree( &benc ); } - return FALSE; + if (( warning = updateAddresses( t, req ) )) { + tr_err( warning ); + publishWarning( t, warning ); + } + + t->scrapeAt = time(NULL) + nextScrapeSec; +} + +/*** +**** +***/ + +struct tr_tracker_request +{ + int port; + int timeout; + char * address; + char * uri; + struct evhttp_request * req; +}; + +static void +freeRequest( struct tr_tracker_request ** req ) +{ + tr_free( (*req)->address ); + tr_free( (*req)->uri ); + tr_free( (*req) ); + *req = NULL; +} + +static void +addCommonHeaders( const tr_tracker * t, + struct evhttp_request * req ) +{ + char buf[1024]; + const tr_tracker_info * address = getCurrentAddress( t ); + snprintf( buf, sizeof(buf), "%s:%d", address->address, address->port ); + evhttp_add_header( req->output_headers, "Host", buf ); + evhttp_add_header( req->output_headers, "Connection", "close" ); + evhttp_add_header( req->output_headers, "Content-Length", "0" ); + evhttp_add_header( req->output_headers, "User-Agent", + TR_NAME "/" LONG_VERSION_STRING ); +} + +static char* +buildTrackerRequestURI( const tr_tracker * t, + const tr_torrent * torrent, + const char * eventName ) +{ + const int isStopping = !strcmp( eventName, "stopped" ); + const int numwant = isStopping ? 0 : NUMWANT; + struct evbuffer * buf = evbuffer_new( ); + char * ret; + + char * ann = getCurrentAddress(t)->announce; + + evbuffer_add_printf( buf, "%s" + "%cinfo_hash=%s" + "&peer_id=%s" + "&port=%d" + "&uploaded=%"PRIu64 + "&downloaded=%"PRIu64 + "&corrupt=%"PRIu64 + "&left=%"PRIu64 + "&compact=1" + "&numwant=%d" + "&key=%s" + "&supportcrypto=1" + "&requirecrypto=%d" + "%s%s" + "%s%s", + ann, + strchr(ann, '?') ? '&' : '?', + t->escaped, + t->peer_id, + tr_sharedGetPublicPort( t->handle->shared ), + torrent->uploadedCur, + torrent->downloadedCur, + torrent->corruptCur, + tr_cpLeftUntilComplete( torrent->completion ), + numwant, + t->key_param, + ( t->handle->encryptionMode==TR_ENCRYPTION_REQUIRED ? 1 : 0 ), + ( ( eventName && *eventName ) ? "&event=" : "" ), + ( ( eventName && *eventName ) ? eventName : "" ), + ( ( t->trackerID && *t->trackerID ) ? "&trackerid=" : "" ), + ( ( t->trackerID && *t->trackerID ) ? t->trackerID : "" ) ); + + ret = tr_strdup( (char*) EVBUFFER_DATA( buf ) ); + evbuffer_free( buf ); + return ret; +} + +static struct tr_tracker_request* +createRequest( tr_handle * handle, const tr_tracker * tracker, int reqtype ) +{ + static const char* strings[TR_REQ_COUNT] = { "started", "completed", "stopped", "" }; + const tr_torrent * torrent = tr_torrentFindFromHash( handle, tracker->hash ); + const tr_tracker_info * address = getCurrentAddress( tracker ); + const int isStopping = reqtype == TR_REQ_STOPPED; + const char * eventName = strings[reqtype]; + struct tr_tracker_request * req; + + req = tr_new0( struct tr_tracker_request, 1 ); + req->address = tr_strdup( address->address ); + req->port = address->port; + req->uri = buildTrackerRequestURI( tracker, torrent, eventName ); + req->timeout = isStopping ? STOP_TIMEOUT_INTERVAL_SEC : TIMEOUT_INTERVAL_SEC; + req->req = isStopping + ? evhttp_request_new( onStoppedResponse, handle ) + : evhttp_request_new( onTrackerResponse, torrentHashNew(handle, tracker) ); + addCommonHeaders( tracker, req->req ); + + return req; +} + +static struct tr_tracker_request* +createScrape( tr_handle * handle, const tr_tracker * tracker ) +{ + const tr_tracker_info * a = getCurrentAddress( tracker ); + struct tr_tracker_request * req; + + req = tr_new0( struct tr_tracker_request, 1 ); + req->address = tr_strdup( a->address ); + req->port = a->port; + req->timeout = TIMEOUT_INTERVAL_SEC; + req->req = evhttp_request_new( onScrapeResponse, torrentHashNew( handle, tracker ) ); + tr_asprintf( &req->uri, "%s%cinfo_hash=%s", a->scrape, strchr(a->scrape,'?')?'&':'?', tracker->escaped ); + addCommonHeaders( tracker, req->req ); + + return req; +} + +struct tr_tracker_handle +{ + int socketCount; + tr_timer * pulseTimer; + tr_list * requestQueue; + tr_list * scrapeQueue; +}; + +static int pulse( void * vhandle ); + +static void +ensureGlobalsExist( tr_handle * handle ) +{ + if( handle->tracker == NULL ) + { + handle->tracker = tr_new0( struct tr_tracker_handle, 1 ); + handle->tracker->pulseTimer = tr_timerNew( handle, pulse, handle, PULSE_INTERVAL_SEC*1000 ); + dbgmsg( NULL, "creating tracker timer" ); + } } static int -onReannounce( void * vt ) +maybeFreeGlobals( tr_handle * handle ) { - tr_tracker * t = vt; - dbgmsg( t, "onReannounce" ); - sendTrackerRequest( t, "" ); - dbgmsg( t, "onReannounce setting announceTimer to NULL" ); - t->reannounceTimer = NULL; + int globalsExist = handle->tracker != NULL; + + if( globalsExist + && ( handle->tracker->socketCount < 1 ) + && ( handle->tracker->requestQueue == NULL ) + && ( handle->tracker->scrapeQueue == NULL ) + && ( handle->torrentList== NULL ) ) + { + dbgmsg( NULL, "freeing tracker timer" ); + tr_timerFree( &handle->tracker->pulseTimer ); + tr_free( handle->tracker ); + handle->tracker = NULL; + globalsExist = FALSE; + } + + return globalsExist; +} + +/*** +**** +***/ + +static int +freeConnection( void * evcon ) +{ + evhttp_connection_free( evcon ); return FALSE; } +static void +connectionClosedCB( struct evhttp_connection * evcon, void * vhandle ) +{ + tr_handle * handle = vhandle; + + assert( handle ); + assert( handle->tracker ); + + /* libevent references evcon right after calling this function, + so we can't free it yet... defer it to after this call chain + has played out */ + tr_timerNew( handle, freeConnection, evcon, 100 ); + + --handle->tracker->socketCount; + dbgmsg( NULL, "decrementing socket count to %d", handle->tracker->socketCount ); +} + +static struct evhttp_connection* +getConnection( tr_handle * handle, const char * address, int port ) +{ + struct evhttp_connection * c = evhttp_connection_new( address, port ); + evhttp_connection_set_closecb( c, connectionClosedCB, handle ); + return c; +} + +static void +invokeRequest( tr_handle * handle, const struct tr_tracker_request * req ) +{ + struct evhttp_connection * evcon = getConnection( handle, req->address, req->port ); + ++handle->tracker->socketCount; + dbgmsg( NULL, "incrementing socket count to %d, sending '%s' to tracker %s:%d", handle->tracker->socketCount, req->uri, req->address, req->port ); + evhttp_connection_set_timeout( evcon, req->timeout ); + evhttp_make_request( evcon, req->req, EVHTTP_REQ_GET, req->uri ); +} + +static void +invokeNextInQueue( tr_handle * handle, tr_list ** list ) +{ + struct tr_tracker_request * req = tr_list_pop_front( list ); + invokeRequest( handle, req ); + freeRequest( &req ); +} static int -onRetry( void * vt ) +socketIsAvailable( tr_handle * handle ) { - tr_tracker * t = vt; - dbgmsg( t, "onRetry" ); - sendTrackerRequest( t, t->lastRequest ); - dbgmsg( t, "onRetry setting announceTimer to NULL" ); - t->reannounceTimer = NULL; - return FALSE; + return handle->tracker->socketCount < MAX_TRACKER_SOCKETS; } +static void ensureGlobalsExist( tr_handle * ); + +static void +enqueueRequest( tr_handle * handle, const tr_tracker * tracker, int reqtype ) +{ + struct tr_tracker_request * req; + ensureGlobalsExist( handle ); + req = createRequest( handle, tracker, reqtype ); + tr_list_append( &handle->tracker->requestQueue, req ); +} + +static void +enqueueScrape( tr_handle * handle, const tr_tracker * tracker ) +{ + struct tr_tracker_request * req; + ensureGlobalsExist( handle ); + req = createScrape( handle, tracker ); + tr_list_append( &handle->tracker->scrapeQueue, req ); +} + +static int +pulse( void * vhandle ) +{ + tr_handle * handle = vhandle; + struct tr_tracker_handle * th = handle->tracker; + tr_torrent * t; + const time_t now = time( NULL ); + + if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) ) + dbgmsg( NULL, "tracker pulse... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) ); + + /* upkeep: queue periodic rescrape / reannounce */ + for( t=handle->torrentList; t; t=t->next ) { + tr_tracker * tracker = t->tracker; + if( tracker->scrapeAt && trackerSupportsScrape( tracker ) && ( now >= tracker->scrapeAt ) ) { + tracker->scrapeAt = 0; + enqueueScrape( handle, tracker ); + } + if( tracker->reannounceAt && tracker->isRunning && ( now >= tracker->reannounceAt ) ) { + tracker->reannounceAt = 0; + enqueueRequest( handle, tracker, TR_REQ_REANNOUNCE ); + } + } + + if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) ) + dbgmsg( NULL, "tracker pulse after upkeep... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) ); + + /* look for things to do... process all the requests, then process all the scrapes */ + while( th->requestQueue && socketIsAvailable( handle ) ) + invokeNextInQueue( handle, &th->requestQueue ); + while( th->scrapeQueue && socketIsAvailable( handle ) ) + invokeNextInQueue( handle, &th->scrapeQueue ); + + if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) ) + dbgmsg( NULL, "tracker pulse done... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) ); + + return maybeFreeGlobals( handle ); +} + +/*** +**** LIFE CYCLE +***/ + +static void +generateKeyParam( char * msg, int len ) +{ + int i; + const char * pool = "abcdefghijklmnopqrstuvwxyz0123456789"; + const int poolSize = strlen( pool ); + for( i=0; iinfo; + int i, j, sum, *iwalk; + tr_tracker_info * nwalk; + tr_tracker * t; + + tr_dbg( "making a new tracker for \"%s\"", info->primaryAddress ); + + t = tr_new0( tr_tracker, 1 ); + t->handle = torrent->handle; + t->scrapeIntervalSec = DEFAULT_SCRAPE_INTERVAL_SEC; + t->announceIntervalSec = DEFAULT_ANNOUNCE_INTERVAL_SEC; + t->announceMinIntervalSec = DEFAULT_ANNOUNCE_MIN_INTERVAL_SEC; + generateKeyParam( t->key_param, KEYLEN ); + + t->publisher = tr_publisherNew( ); + t->timesDownloaded = -1; + t->seederCount = -1; + t->leecherCount = -1; + t->manualAnnounceAllowedAt = ~(time_t)0; + t->name = tr_strdup( info->name ); + memcpy( t->hash, info->hash, SHA_DIGEST_LENGTH ); + escape( t->escaped, info->hash, SHA_DIGEST_LENGTH ); + + for( sum=i=0; itrackerTiers; ++i ) + sum += info->trackerList[i].count; + t->addresses = nwalk = tr_new0( tr_tracker_info, sum ); + t->addressIndex = 0; + t->addressCount = sum; + t->tierFronts = iwalk = tr_new0( int, sum ); + + for( i=0; itrackerTiers; ++i ) + { + const int tierFront = nwalk - t->addresses; + + for( j=0; jtrackerList[i].count; ++j ) + { + const tr_tracker_info * src = &info->trackerList[i].list[j]; + nwalk->address = tr_strdup( src->address ); + nwalk->port = src->port; + nwalk->announce = tr_strdup( src->announce ); + nwalk->scrape = tr_strdup( src->scrape ); + ++nwalk; + + *iwalk++ = tierFront; + } + } + + assert( nwalk - t->addresses == sum ); + assert( iwalk - t->tierFronts == sum ); + + if( trackerSupportsScrape( t ) ) + enqueueScrape( t->handle, t ); + + return t; +} + +static void +onTrackerFreeNow( void * vt ) +{ + int i; + tr_tracker * t = vt; + + tr_publisherFree( &t->publisher ); + tr_free( t->name ); + tr_free( t->trackerID ); + tr_free( t->lastRequest ); + + /* addresses... */ + for( i=0; iaddressCount; ++i ) + tr_trackerInfoClear( &t->addresses[i] ); + tr_free( t->addresses ); + tr_free( t->tierFronts ); + + /* redirect... */ + if( t->redirect ) { + tr_trackerInfoClear( t->redirect ); + tr_free( t->redirect ); + } + + tr_free( t ); +} + +void +tr_trackerFree( tr_tracker * t ) +{ + tr_runInEventThread( t->handle, onTrackerFreeNow, t ); +} + + /*** **** PUBLIC ***/ @@ -983,60 +1058,35 @@ tr_trackerGetCounts( const tr_tracker * t, *setme_seederCount = t->seederCount; } -struct request_data -{ - tr_tracker * t; - const char * command; -}; - -static void -sendRequestFromEventThreadImpl( void * vdata ) -{ - struct request_data * data = vdata; - sendTrackerRequest( data->t, data->command ); - tr_free( data ); -} - -static void -sendRequestFromEventThread( tr_tracker * t, const char * command ) -{ - struct request_data * data = tr_new( struct request_data, 1 ); - data->t = t; - data->command = command; - tr_runInEventThread( t->handle, sendRequestFromEventThreadImpl, data ); -} void tr_trackerStart( tr_tracker * t ) { tr_peerIdNew( t->peer_id, sizeof(t->peer_id) ); - - if( !t->reannounceTimer && !t->isRunning ) - { + if( t->isRunning == 0 ) { t->isRunning = 1; - sendRequestFromEventThread( t, "started" ); + enqueueRequest( t->handle, t, TR_REQ_STARTED ); } } void tr_trackerReannounce( tr_tracker * t ) { - sendRequestFromEventThread( t, "started" ); + enqueueRequest( t->handle, t, TR_REQ_REANNOUNCE ); } void tr_trackerCompleted( tr_tracker * t ) { - sendRequestFromEventThread( t, "completed" ); + enqueueRequest( t->handle, t, TR_REQ_COMPLETED ); } void tr_trackerStop( tr_tracker * t ) { - if( t->isRunning ) - { + if( t->isRunning ) { t->isRunning = 0; - sendRequestFromEventThread( t, "stopped" ); + enqueueRequest( t->handle, t, TR_REQ_STOPPED ); } } diff --git a/libtransmission/transmission.c b/libtransmission/transmission.c index df4031a2a..9af89f2dd 100644 --- a/libtransmission/transmission.c +++ b/libtransmission/transmission.c @@ -347,10 +347,12 @@ deadlineReached( const uint64_t deadline ) return tr_date( ) >= deadline; } +#define SHUTDOWN_MAX_SECONDS 30 + void tr_close( tr_handle * h ) { - const int maxwait_msec = 6 * 1000; + const int maxwait_msec = SHUTDOWN_MAX_SECONDS * 1000; const uint64_t deadline = tr_date( ) + maxwait_msec; tr_runInEventThread( h, tr_closeImpl, h ); diff --git a/libtransmission/trevent.c b/libtransmission/trevent.c index e786edf9c..93c83ca45 100644 --- a/libtransmission/trevent.c +++ b/libtransmission/trevent.c @@ -63,7 +63,6 @@ static int writes = 0; enum mode { - TR_EV_EVHTTP_MAKE_REQUEST, TR_EV_TIMER_ADD, TR_EV_EXEC }; @@ -130,11 +129,6 @@ pumpList( int i UNUSED, short s UNUSED, void * veh ) ++eh->timerCount; break; - case TR_EV_EVHTTP_MAKE_REQUEST: - evhttp_make_request( cmd->evcon, cmd->req, cmd->evtype, cmd->uri ); - tr_free( cmd->uri ); - break; - case TR_EV_EXEC: (cmd->func)( cmd->user_data ); break; @@ -242,28 +236,6 @@ tr_amInEventThread( struct tr_handle * handle ) return tr_amInThread( handle->events->thread ); } - -void -tr_evhttp_make_request (tr_handle * handle, - struct evhttp_connection * evcon, - struct evhttp_request * req, - enum evhttp_cmd_type type, - char * uri) -{ - if( tr_amInThread( handle->events->thread ) ) { - evhttp_make_request( evcon, req, type, uri ); - tr_free( uri ); - } else { - struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 ); - cmd->mode = TR_EV_EVHTTP_MAKE_REQUEST; - cmd->evcon = evcon; - cmd->req = req; - cmd->evtype = type; - cmd->uri = uri; - pushList( handle->events, cmd ); - } -} - /** *** **/ diff --git a/libtransmission/trevent.h b/libtransmission/trevent.h index b9ddffe66..bdcb1685c 100644 --- a/libtransmission/trevent.h +++ b/libtransmission/trevent.h @@ -31,12 +31,6 @@ struct evhttp_request; struct evhttp_connection; struct bufferevent; -void tr_evhttp_make_request (struct tr_handle * tr_handle, - struct evhttp_connection * evcon, - struct evhttp_request * req, - enum evhttp_cmd_type type, - char * uri); - /** *** **/