From 123a5c53a9d733faf39ff92009492956f70aa8ad Mon Sep 17 00:00:00 2001 From: Charles Kerr Date: Thu, 16 Oct 2008 05:24:57 +0000 Subject: [PATCH] Try to hammer out some of the bugs in web.c. This still feels like a work in progress. Also, add a queue for tracker requests so that we don't wind up with 100s of concurrent http requests when there are a lot of torrents. --- libtransmission/web.c | 108 ++++++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 35 deletions(-) diff --git a/libtransmission/web.c b/libtransmission/web.c index 0f0fb34af..0c6fbae5c 100644 --- a/libtransmission/web.c +++ b/libtransmission/web.c @@ -22,11 +22,14 @@ #include "utils.h" #include "web.h" +#define MAX_CONCURRENT_TASKS 16 + #if LIBCURL_VERSION_NUM < 0x071003 #define curl_multi_socket_action(m,fd,mask,i) curl_multi_socket((m),(fd),(i)) #endif #define dbgmsg( ... ) tr_deepLog( __FILE__, __LINE__, "web", __VA_ARGS__ ) +/* #define dbgmsg(...) do { fprintf( stderr, __VA_ARGS__ ); fprintf( stderr, "\n" ); } while( 0 ) */ struct tr_web { @@ -36,6 +39,7 @@ struct tr_web CURLM * multi; tr_session * session; struct event timer_event; + tr_list * easy_queue; }; /*** @@ -58,6 +62,7 @@ writeFunc( void * ptr, size_t size, size_t nmemb, void * task ) { const size_t byteCount = size * nmemb; evbuffer_add( ((struct tr_web_task*)task)->response, ptr, byteCount ); + dbgmsg( "wrote %zu bytes to task %p's buffer", byteCount, task ); return byteCount; } @@ -81,46 +86,52 @@ addTask( void * vtask ) if( session && session->web ) { struct tr_web * web = session->web; - CURL * ch; - CURLMcode rc; + CURL * easy; dbgmsg( "adding task #%lu [%s]", task->tag, task->url ); - ch = curl_easy_init( ); + easy = curl_easy_init( ); if( !task->range && session->isProxyEnabled ) { - curl_easy_setopt( ch, CURLOPT_PROXY, session->proxy ); - curl_easy_setopt( ch, CURLOPT_PROXYPORT, session->proxyPort ); - curl_easy_setopt( ch, CURLOPT_PROXYTYPE, getCurlProxyType( session->proxyType ) ); - curl_easy_setopt( ch, CURLOPT_PROXYAUTH, CURLAUTH_ANY ); + curl_easy_setopt( easy, CURLOPT_PROXY, session->proxy ); + curl_easy_setopt( easy, CURLOPT_PROXYPORT, session->proxyPort ); + curl_easy_setopt( easy, CURLOPT_PROXYTYPE, getCurlProxyType( session->proxyType ) ); + curl_easy_setopt( easy, CURLOPT_PROXYAUTH, CURLAUTH_ANY ); } if( !task->range && session->isProxyAuthEnabled ) { char * str = tr_strdup_printf( "%s:%s", session->proxyUsername, session->proxyPassword ); - curl_easy_setopt( ch, CURLOPT_PROXYUSERPWD, str ); + curl_easy_setopt( easy, CURLOPT_PROXYUSERPWD, str ); tr_free( str ); } - curl_easy_setopt( ch, CURLOPT_PRIVATE, task ); - curl_easy_setopt( ch, CURLOPT_URL, task->url ); - curl_easy_setopt( ch, CURLOPT_WRITEFUNCTION, writeFunc ); - curl_easy_setopt( ch, CURLOPT_WRITEDATA, task ); - curl_easy_setopt( ch, CURLOPT_USERAGENT, TR_NAME "/" LONG_VERSION_STRING ); - curl_easy_setopt( ch, CURLOPT_SSL_VERIFYHOST, 0 ); - curl_easy_setopt( ch, CURLOPT_SSL_VERIFYPEER, 0 ); - curl_easy_setopt( ch, CURLOPT_FORBID_REUSE, 1 ); - curl_easy_setopt( ch, CURLOPT_NOSIGNAL, 1 ); - curl_easy_setopt( ch, CURLOPT_FOLLOWLOCATION, 1 ); - curl_easy_setopt( ch, CURLOPT_MAXREDIRS, 5 ); - curl_easy_setopt( ch, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4 ); - curl_easy_setopt( ch, CURLOPT_VERBOSE, getenv( "TR_CURL_VERBOSE" ) != NULL ); + curl_easy_setopt( easy, CURLOPT_PRIVATE, task ); + curl_easy_setopt( easy, CURLOPT_URL, task->url ); + curl_easy_setopt( easy, CURLOPT_WRITEFUNCTION, writeFunc ); + curl_easy_setopt( easy, CURLOPT_WRITEDATA, task ); + curl_easy_setopt( easy, CURLOPT_USERAGENT, TR_NAME "/" LONG_VERSION_STRING ); + curl_easy_setopt( easy, CURLOPT_SSL_VERIFYHOST, 0 ); + curl_easy_setopt( easy, CURLOPT_SSL_VERIFYPEER, 0 ); + //curl_easy_setopt( easy, CURLOPT_FORBID_REUSE, 1 ); + curl_easy_setopt( easy, CURLOPT_NOSIGNAL, 1 ); + curl_easy_setopt( easy, CURLOPT_FOLLOWLOCATION, 1 ); + curl_easy_setopt( easy, CURLOPT_MAXREDIRS, 5 ); + curl_easy_setopt( easy, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4 ); + curl_easy_setopt( easy, CURLOPT_VERBOSE, getenv( "TR_CURL_VERBOSE" ) != NULL ); if( task->range ) - curl_easy_setopt( ch, CURLOPT_RANGE, task->range ); + curl_easy_setopt( easy, CURLOPT_RANGE, task->range ); else /* don't set encoding if range is sent; it messes up binary data */ - curl_easy_setopt( ch, CURLOPT_ENCODING, "" ); + curl_easy_setopt( easy, CURLOPT_ENCODING, "" ); - rc = curl_multi_add_handle( web->multi, ch ); - if( rc != CURLM_OK ) - tr_err( "%s", curl_multi_strerror( rc ) ); + if( web->still_running >= MAX_CONCURRENT_TASKS ) { + tr_list_append( &web->easy_queue, easy ); + dbgmsg( " >> adding a task to the curl queue... size is now %d", tr_list_size( web->easy_queue ) ); + } else { + CURLMcode rc = curl_multi_add_handle( web->multi, easy ); + if( rc == CURLM_OK ) + ++web->still_running; + else + tr_err( "%s", curl_multi_strerror( rc ) ); + } } } @@ -164,10 +175,12 @@ webDestroy( tr_web * web ) static void check_run_count( tr_web * g ) { + int added_from_queue = FALSE; + dbgmsg( "check_run_count: prev_running %d, still_running %d", g->prev_running, g->still_running ); - if( g->prev_running > g->still_running ) + if( 1 ) { CURLMsg * msg; int msgs_left; @@ -197,16 +210,38 @@ check_run_count( tr_web * g ) g->prev_running = g->still_running; - if( g->still_running <= 0 ) { - if( evtimer_pending( &g->timer_event, NULL ) ) { - dbgmsg( "deleting the pending global timer" ); - evtimer_del( &g->timer_event ); +dbgmsg( "--> still running: %d ... max: %d ... queue size: %d", g->still_running, MAX_CONCURRENT_TASKS, tr_list_size( g->easy_queue ) ); + + while( ( g->still_running < MAX_CONCURRENT_TASKS ) + && ( tr_list_size( g->easy_queue ) > 0 ) ) + { + CURL * easy = tr_list_pop_front( &g->easy_queue ); + if( easy ) + { + const CURLMcode rc = curl_multi_add_handle( g->multi, easy ); + if( rc != CURLM_OK ) { + tr_err( "%s", curl_multi_strerror( rc ) ); + } else { + dbgmsg( "pumped a task out of the curl queue... %d remain", tr_list_size( g->easy_queue ) ); + added_from_queue = TRUE; + ++g->still_running; + } } } - if( g->dying && ( g->still_running < 1 ) ) { - dbgmsg( "destroying the web global now that all the tasks are done" ); - webDestroy( g ); + if( !added_from_queue ) + { + if( g->still_running <= 0 ) { + if( evtimer_pending( &g->timer_event, NULL ) ) { + dbgmsg( "deleting the pending global timer" ); + evtimer_del( &g->timer_event ); + } + } + + if( g->dying && ( g->still_running < 1 ) ) { + dbgmsg( "destroying the web global now that all the tasks are done" ); + webDestroy( g ); + } } } @@ -232,6 +267,7 @@ event_cb( int fd, short kind, void * vg ) do { dbgmsg( "event_cb calling socket_action fd %d, mask %d", fd, mask ); rc = curl_multi_socket_action( g->multi, fd, mask, &g->still_running ); + dbgmsg( "event_cb(): still_running is %d", g->still_running ); } while( rc == CURLM_CALL_MULTI_PERFORM ); if( rc != CURLM_OK ) tr_err( "%s", curl_multi_strerror( rc ) ); @@ -251,6 +287,7 @@ timer_cb( int socket UNUSED, short action UNUSED, void * vg ) dbgmsg( "timer_cb calling CURL_SOCKET_TIMEOUT" ); rc = curl_multi_socket_action( g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running ); + dbgmsg( "timer_cb(): still_running is %d", g->still_running ); } while( rc == CURLM_CALL_MULTI_PERFORM ); if( rc != CURLM_OK ) @@ -276,7 +313,8 @@ setsock( curl_socket_t sockfd, struct tr_web * g, struct tr_web_sockinfo * f ) { - const int kind = (action & CURL_POLL_IN ? EV_READ : 0) + const int kind = EV_PERSIST + | (action & CURL_POLL_IN ? EV_READ : 0) | (action & CURL_POLL_OUT ? EV_WRITE : 0); dbgmsg( "setsock: fd is %d, curl action is %d, libevent action is %d", sockfd, action, kind ); if( f->evset )