experimental code that should make tracker interaction (especially shutdown) much faster.

This commit is contained in:
Charles Kerr 2007-12-27 21:48:41 +00:00
parent 3671723597
commit f88eb7d771
1 changed files with 58 additions and 29 deletions

View File

@ -661,8 +661,6 @@ addCommonHeaders( const tr_tracker * t,
const tr_tracker_info * address = getCurrentAddress( t ); const tr_tracker_info * address = getCurrentAddress( t );
snprintf( buf, sizeof(buf), "%s:%d", address->address, address->port ); snprintf( buf, sizeof(buf), "%s:%d", address->address, address->port );
evhttp_add_header( req->output_headers, "Host", buf ); evhttp_add_header( req->output_headers, "Host", buf );
evhttp_add_header( req->output_headers, "Connection", "close" );
evhttp_add_header( req->output_headers, "Content-Length", "0" );
evhttp_add_header( req->output_headers, "User-Agent", evhttp_add_header( req->output_headers, "User-Agent",
TR_NAME "/" LONG_VERSION_STRING ); TR_NAME "/" LONG_VERSION_STRING );
} }
@ -762,11 +760,11 @@ createScrape( tr_handle * handle, const tr_tracker * tracker )
struct tr_tracker_handle struct tr_tracker_handle
{ {
int socketCount;
unsigned int isShuttingDown : 1; unsigned int isShuttingDown : 1;
tr_timer * pulseTimer; tr_timer * pulseTimer;
tr_list * requestQueue; tr_list * requestQueue;
tr_list * scrapeQueue; tr_list * scrapeQueue;
tr_list * connectionPool;
}; };
static int pulse( void * vhandle ); static int pulse( void * vhandle );
@ -806,7 +804,7 @@ maybeFreeGlobals( tr_handle * handle )
int globalsExist = handle->tracker != NULL; int globalsExist = handle->tracker != NULL;
if( globalsExist if( globalsExist
&& ( handle->tracker->socketCount < 1 ) && ( handle->tracker->connectionPool == NULL )
&& ( handle->tracker->requestQueue == NULL ) && ( handle->tracker->requestQueue == NULL )
&& ( handle->tracker->scrapeQueue == NULL ) && ( handle->tracker->scrapeQueue == NULL )
&& ( handle->torrentList== NULL ) ) && ( handle->torrentList== NULL ) )
@ -831,22 +829,68 @@ freeConnection( void * evcon )
evhttp_connection_free( evcon ); evhttp_connection_free( evcon );
return FALSE; return FALSE;
} }
static int
connectionPoolCompare( const void * a, const void * b )
{
return a != b;
}
static void static void
connectionClosedCB( struct evhttp_connection * evcon, void * vhandle ) connectionClosedCB( struct evhttp_connection * evcon, void * vhandle )
{ {
char * address;
unsigned short port;
tr_handle * handle = vhandle; tr_handle * handle = vhandle;
evhttp_connection_get_peer( evcon, &address, &port );
dbgmsg( NULL, "%s:%d evcon [%s:%d] has closed... removing from pool", __FILE__, __LINE__, address, port );
tr_list_remove( &handle->tracker->connectionPool,
evcon,
connectionPoolCompare );
/* libevent references evcon right after calling this function, /* libevent references evcon right after calling this function,
so we can't free it yet... defer it to after this call chain so we can't free it yet... defer it to after this call chain
has played out */ has played out */
tr_timerNew( handle, freeConnection, evcon, 100 ); tr_timerNew( handle, freeConnection, evcon, 100 );
} }
static struct evhttp_connection*
getExistingConnection( tr_handle * handle, const char * address, int port )
{
tr_list * l;
dbgmsg( NULL, "%s:%d looking for a connection to {%s:%d}\n", __FILE__, __LINE__, address, port );
for( l=handle->tracker->connectionPool; l!=NULL; l=l->next )
{
char * c_address;
unsigned short c_port;
evhttp_connection_get_peer( l->data, &c_address, &c_port );
dbgmsg( NULL, "comparing with [%s:%d]...", c_address, c_port );
if( ( port == c_port ) && !strcmp( address, c_address ) ) {
dbgmsg( NULL, "%s:%d reusing cached connection of [%s:%d]\n", __FILE__, __LINE__, address, port );
return l->data;
}
}
dbgmsg( NULL, "%s:%d no match.\n", __FILE__, __LINE__ );
return NULL;
}
static struct evhttp_connection* static struct evhttp_connection*
getConnection( tr_handle * handle, const char * address, int port ) getConnection( tr_handle * handle, const char * address, int port )
{ {
struct evhttp_connection * c = evhttp_connection_new( address, port ); struct evhttp_connection * c = getExistingConnection( handle, address, port );
evhttp_connection_set_closecb( c, connectionClosedCB, handle );
if( c == NULL )
{
dbgmsg( NULL, "%s:%d creating new connection for [%s:%d]\n", __FILE__, __LINE__, address, port );
c = evhttp_connection_new( address, port );
evhttp_connection_set_closecb( c, connectionClosedCB, handle );
tr_list_prepend( &handle->tracker->connectionPool, c );
}
return c; return c;
} }
@ -859,10 +903,6 @@ invokeRequest( tr_handle * handle, const struct tr_tracker_request * req )
evhttp_connection_set_timeout( evcon, req->timeout ); evhttp_connection_set_timeout( evcon, req->timeout );
if( evhttp_make_request( evcon, req->req, EVHTTP_REQ_GET, req->uri )) if( evhttp_make_request( evcon, req->req, EVHTTP_REQ_GET, req->uri ))
publishErrorMessageAndStop( t, "Tracker could not be reached." ); publishErrorMessageAndStop( t, "Tracker could not be reached." );
else {
++handle->tracker->socketCount;
dbgmsg( t, "incremented socket count to %d", handle->tracker->socketCount );
}
} }
static void static void
@ -873,15 +913,6 @@ invokeNextInQueue( tr_handle * handle, tr_list ** list )
freeRequest( req ); freeRequest( req );
} }
static int
socketIsAvailable( tr_handle * handle )
{
const int max = handle->tracker->isShuttingDown
? MAX_TRACKER_SOCKETS_DURING_SHUTDOWN
: MAX_TRACKER_SOCKETS;
return handle->tracker->socketCount < max;
}
static void ensureGlobalsExist( tr_handle * ); static void ensureGlobalsExist( tr_handle * );
static void static void
@ -913,8 +944,8 @@ pulse( void * vhandle )
if( handle->tracker == NULL ) if( handle->tracker == NULL )
return FALSE; return FALSE;
if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) ) if( tr_list_size(th->connectionPool) || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) )
dbgmsg( NULL, "tracker pulse... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) ); dbgmsg( NULL, "tracker pulse... %d sockets, %d reqs left, %d scrapes left", tr_list_size(th->connectionPool), tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) );
/* upkeep: queue periodic rescrape / reannounce */ /* upkeep: queue periodic rescrape / reannounce */
for( tor=handle->torrentList; tor; tor=tor->next ) for( tor=handle->torrentList; tor; tor=tor->next )
@ -932,17 +963,17 @@ pulse( void * vhandle )
} }
} }
if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) ) if( tr_list_size(th->connectionPool) || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) )
dbgmsg( NULL, "tracker pulse after upkeep... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) ); dbgmsg( NULL, "tracker pulse after upkeep... %d sockets, %d reqs left, %d scrapes left", tr_list_size(th->connectionPool), tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) );
/* look for things to do... process all the requests, then process all the scrapes */ /* look for things to do... process all the requests, then process all the scrapes */
while( th->requestQueue && socketIsAvailable( handle ) ) while( th->requestQueue )
invokeNextInQueue( handle, &th->requestQueue ); invokeNextInQueue( handle, &th->requestQueue );
while( th->scrapeQueue && socketIsAvailable( handle ) ) while( th->scrapeQueue )
invokeNextInQueue( handle, &th->scrapeQueue ); invokeNextInQueue( handle, &th->scrapeQueue );
if( handle->tracker->socketCount || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) ) if( tr_list_size(th->connectionPool) || tr_list_size(th->requestQueue) || tr_list_size(th->scrapeQueue) )
dbgmsg( NULL, "tracker pulse done... %d sockets, %d reqs left, %d scrapes left", handle->tracker->socketCount, tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) ); dbgmsg( NULL, "tracker pulse done... %d sockets, %d reqs left, %d scrapes left", tr_list_size(th->connectionPool), tr_list_size(th->requestQueue), tr_list_size(th->scrapeQueue) );
return maybeFreeGlobals( handle ); return maybeFreeGlobals( handle );
} }
@ -953,8 +984,6 @@ onReqDone( tr_handle * handle )
if( handle->tracker ) if( handle->tracker )
{ {
pulse( handle ); pulse( handle );
--handle->tracker->socketCount;
dbgmsg( NULL, "decrementing socket count to %d", handle->tracker->socketCount );
} }
} }