Try to hammer out some of the bugs in web.c. This still feels like a work in progress. Also, add a queue for tracker requests so that we don't wind up with 100s of concurrent http requests when there are a lot of torrents.

This commit is contained in:
Charles Kerr 2008-10-16 05:24:57 +00:00
parent 45160cff16
commit 123a5c53a9
1 changed files with 73 additions and 35 deletions

View File

@ -22,11 +22,14 @@
#include "utils.h" #include "utils.h"
#include "web.h" #include "web.h"
#define MAX_CONCURRENT_TASKS 16
#if LIBCURL_VERSION_NUM < 0x071003 #if LIBCURL_VERSION_NUM < 0x071003
#define curl_multi_socket_action(m,fd,mask,i) curl_multi_socket((m),(fd),(i)) #define curl_multi_socket_action(m,fd,mask,i) curl_multi_socket((m),(fd),(i))
#endif #endif
#define dbgmsg( ... ) tr_deepLog( __FILE__, __LINE__, "web", __VA_ARGS__ ) #define dbgmsg( ... ) tr_deepLog( __FILE__, __LINE__, "web", __VA_ARGS__ )
/* #define dbgmsg(...) do { fprintf( stderr, __VA_ARGS__ ); fprintf( stderr, "\n" ); } while( 0 ) */
struct tr_web struct tr_web
{ {
@ -36,6 +39,7 @@ struct tr_web
CURLM * multi; CURLM * multi;
tr_session * session; tr_session * session;
struct event timer_event; struct event timer_event;
tr_list * easy_queue;
}; };
/*** /***
@ -58,6 +62,7 @@ writeFunc( void * ptr, size_t size, size_t nmemb, void * task )
{ {
const size_t byteCount = size * nmemb; const size_t byteCount = size * nmemb;
evbuffer_add( ((struct tr_web_task*)task)->response, ptr, byteCount ); evbuffer_add( ((struct tr_web_task*)task)->response, ptr, byteCount );
dbgmsg( "wrote %zu bytes to task %p's buffer", byteCount, task );
return byteCount; return byteCount;
} }
@ -81,46 +86,52 @@ addTask( void * vtask )
if( session && session->web ) if( session && session->web )
{ {
struct tr_web * web = session->web; struct tr_web * web = session->web;
CURL * ch; CURL * easy;
CURLMcode rc;
dbgmsg( "adding task #%lu [%s]", task->tag, task->url ); dbgmsg( "adding task #%lu [%s]", task->tag, task->url );
ch = curl_easy_init( ); easy = curl_easy_init( );
if( !task->range && session->isProxyEnabled ) { if( !task->range && session->isProxyEnabled ) {
curl_easy_setopt( ch, CURLOPT_PROXY, session->proxy ); curl_easy_setopt( easy, CURLOPT_PROXY, session->proxy );
curl_easy_setopt( ch, CURLOPT_PROXYPORT, session->proxyPort ); curl_easy_setopt( easy, CURLOPT_PROXYPORT, session->proxyPort );
curl_easy_setopt( ch, CURLOPT_PROXYTYPE, getCurlProxyType( session->proxyType ) ); curl_easy_setopt( easy, CURLOPT_PROXYTYPE, getCurlProxyType( session->proxyType ) );
curl_easy_setopt( ch, CURLOPT_PROXYAUTH, CURLAUTH_ANY ); curl_easy_setopt( easy, CURLOPT_PROXYAUTH, CURLAUTH_ANY );
} }
if( !task->range && session->isProxyAuthEnabled ) { if( !task->range && session->isProxyAuthEnabled ) {
char * str = tr_strdup_printf( "%s:%s", session->proxyUsername, session->proxyPassword ); char * str = tr_strdup_printf( "%s:%s", session->proxyUsername, session->proxyPassword );
curl_easy_setopt( ch, CURLOPT_PROXYUSERPWD, str ); curl_easy_setopt( easy, CURLOPT_PROXYUSERPWD, str );
tr_free( str ); tr_free( str );
} }
curl_easy_setopt( ch, CURLOPT_PRIVATE, task ); curl_easy_setopt( easy, CURLOPT_PRIVATE, task );
curl_easy_setopt( ch, CURLOPT_URL, task->url ); curl_easy_setopt( easy, CURLOPT_URL, task->url );
curl_easy_setopt( ch, CURLOPT_WRITEFUNCTION, writeFunc ); curl_easy_setopt( easy, CURLOPT_WRITEFUNCTION, writeFunc );
curl_easy_setopt( ch, CURLOPT_WRITEDATA, task ); curl_easy_setopt( easy, CURLOPT_WRITEDATA, task );
curl_easy_setopt( ch, CURLOPT_USERAGENT, TR_NAME "/" LONG_VERSION_STRING ); curl_easy_setopt( easy, CURLOPT_USERAGENT, TR_NAME "/" LONG_VERSION_STRING );
curl_easy_setopt( ch, CURLOPT_SSL_VERIFYHOST, 0 ); curl_easy_setopt( easy, CURLOPT_SSL_VERIFYHOST, 0 );
curl_easy_setopt( ch, CURLOPT_SSL_VERIFYPEER, 0 ); curl_easy_setopt( easy, CURLOPT_SSL_VERIFYPEER, 0 );
curl_easy_setopt( ch, CURLOPT_FORBID_REUSE, 1 ); //curl_easy_setopt( easy, CURLOPT_FORBID_REUSE, 1 );
curl_easy_setopt( ch, CURLOPT_NOSIGNAL, 1 ); curl_easy_setopt( easy, CURLOPT_NOSIGNAL, 1 );
curl_easy_setopt( ch, CURLOPT_FOLLOWLOCATION, 1 ); curl_easy_setopt( easy, CURLOPT_FOLLOWLOCATION, 1 );
curl_easy_setopt( ch, CURLOPT_MAXREDIRS, 5 ); curl_easy_setopt( easy, CURLOPT_MAXREDIRS, 5 );
curl_easy_setopt( ch, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4 ); curl_easy_setopt( easy, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4 );
curl_easy_setopt( ch, CURLOPT_VERBOSE, getenv( "TR_CURL_VERBOSE" ) != NULL ); curl_easy_setopt( easy, CURLOPT_VERBOSE, getenv( "TR_CURL_VERBOSE" ) != NULL );
if( task->range ) if( task->range )
curl_easy_setopt( ch, CURLOPT_RANGE, task->range ); curl_easy_setopt( easy, CURLOPT_RANGE, task->range );
else /* don't set encoding if range is sent; it messes up binary data */ else /* don't set encoding if range is sent; it messes up binary data */
curl_easy_setopt( ch, CURLOPT_ENCODING, "" ); curl_easy_setopt( easy, CURLOPT_ENCODING, "" );
rc = curl_multi_add_handle( web->multi, ch ); if( web->still_running >= MAX_CONCURRENT_TASKS ) {
if( rc != CURLM_OK ) tr_list_append( &web->easy_queue, easy );
tr_err( "%s", curl_multi_strerror( rc ) ); dbgmsg( " >> adding a task to the curl queue... size is now %d", tr_list_size( web->easy_queue ) );
} else {
CURLMcode rc = curl_multi_add_handle( web->multi, easy );
if( rc == CURLM_OK )
++web->still_running;
else
tr_err( "%s", curl_multi_strerror( rc ) );
}
} }
} }
@ -164,10 +175,12 @@ webDestroy( tr_web * web )
static void static void
check_run_count( tr_web * g ) check_run_count( tr_web * g )
{ {
int added_from_queue = FALSE;
dbgmsg( "check_run_count: prev_running %d, still_running %d", dbgmsg( "check_run_count: prev_running %d, still_running %d",
g->prev_running, g->still_running ); g->prev_running, g->still_running );
if( g->prev_running > g->still_running ) if( 1 )
{ {
CURLMsg * msg; CURLMsg * msg;
int msgs_left; int msgs_left;
@ -197,16 +210,38 @@ check_run_count( tr_web * g )
g->prev_running = g->still_running; g->prev_running = g->still_running;
if( g->still_running <= 0 ) { dbgmsg( "--> still running: %d ... max: %d ... queue size: %d", g->still_running, MAX_CONCURRENT_TASKS, tr_list_size( g->easy_queue ) );
if( evtimer_pending( &g->timer_event, NULL ) ) {
dbgmsg( "deleting the pending global timer" ); while( ( g->still_running < MAX_CONCURRENT_TASKS )
evtimer_del( &g->timer_event ); && ( tr_list_size( g->easy_queue ) > 0 ) )
{
CURL * easy = tr_list_pop_front( &g->easy_queue );
if( easy )
{
const CURLMcode rc = curl_multi_add_handle( g->multi, easy );
if( rc != CURLM_OK ) {
tr_err( "%s", curl_multi_strerror( rc ) );
} else {
dbgmsg( "pumped a task out of the curl queue... %d remain", tr_list_size( g->easy_queue ) );
added_from_queue = TRUE;
++g->still_running;
}
} }
} }
if( g->dying && ( g->still_running < 1 ) ) { if( !added_from_queue )
dbgmsg( "destroying the web global now that all the tasks are done" ); {
webDestroy( g ); if( g->still_running <= 0 ) {
if( evtimer_pending( &g->timer_event, NULL ) ) {
dbgmsg( "deleting the pending global timer" );
evtimer_del( &g->timer_event );
}
}
if( g->dying && ( g->still_running < 1 ) ) {
dbgmsg( "destroying the web global now that all the tasks are done" );
webDestroy( g );
}
} }
} }
@ -232,6 +267,7 @@ event_cb( int fd, short kind, void * vg )
do { do {
dbgmsg( "event_cb calling socket_action fd %d, mask %d", fd, mask ); dbgmsg( "event_cb calling socket_action fd %d, mask %d", fd, mask );
rc = curl_multi_socket_action( g->multi, fd, mask, &g->still_running ); rc = curl_multi_socket_action( g->multi, fd, mask, &g->still_running );
dbgmsg( "event_cb(): still_running is %d", g->still_running );
} while( rc == CURLM_CALL_MULTI_PERFORM ); } while( rc == CURLM_CALL_MULTI_PERFORM );
if( rc != CURLM_OK ) if( rc != CURLM_OK )
tr_err( "%s", curl_multi_strerror( rc ) ); tr_err( "%s", curl_multi_strerror( rc ) );
@ -251,6 +287,7 @@ timer_cb( int socket UNUSED, short action UNUSED, void * vg )
dbgmsg( "timer_cb calling CURL_SOCKET_TIMEOUT" ); dbgmsg( "timer_cb calling CURL_SOCKET_TIMEOUT" );
rc = curl_multi_socket_action( g->multi, CURL_SOCKET_TIMEOUT, 0, rc = curl_multi_socket_action( g->multi, CURL_SOCKET_TIMEOUT, 0,
&g->still_running ); &g->still_running );
dbgmsg( "timer_cb(): still_running is %d", g->still_running );
} while( rc == CURLM_CALL_MULTI_PERFORM ); } while( rc == CURLM_CALL_MULTI_PERFORM );
if( rc != CURLM_OK ) if( rc != CURLM_OK )
@ -276,7 +313,8 @@ setsock( curl_socket_t sockfd,
struct tr_web * g, struct tr_web * g,
struct tr_web_sockinfo * f ) struct tr_web_sockinfo * f )
{ {
const int kind = (action & CURL_POLL_IN ? EV_READ : 0) const int kind = EV_PERSIST
| (action & CURL_POLL_IN ? EV_READ : 0)
| (action & CURL_POLL_OUT ? EV_WRITE : 0); | (action & CURL_POLL_OUT ? EV_WRITE : 0);
dbgmsg( "setsock: fd is %d, curl action is %d, libevent action is %d", sockfd, action, kind ); dbgmsg( "setsock: fd is %d, curl action is %d, libevent action is %d", sockfd, action, kind );
if( f->evset ) if( f->evset )