<bullwinkle>this time for sure!</bullwinkle>

This commit is contained in:
Charles Kerr 2008-10-18 00:20:37 +00:00
parent a44f3d75a0
commit 00cfd90bb5
1 changed files with 96 additions and 102 deletions

View File

@ -22,20 +22,16 @@
#include "utils.h" #include "utils.h"
#include "web.h" #include "web.h"
#define MAX_CONCURRENT_TASKS 16 #define MAX_CONCURRENT_TASKS 24
#if LIBCURL_VERSION_NUM < 0x071003 #define DEFAULT_TIMER_MSEC 2500
#define curl_multi_socket_action(m,fd,mask,i) curl_multi_socket((m),(fd),(i))
#endif
#define DEFAULT_TIMER_MSEC 2000
#define dbgmsg( ... ) tr_deepLog( __FILE__, __LINE__, "web", __VA_ARGS__ ) #define dbgmsg( ... ) tr_deepLog( __FILE__, __LINE__, "web", __VA_ARGS__ )
/* #define dbgmsg(...) do { fprintf( stderr, __VA_ARGS__ ); fprintf( stderr, "\n" ); } while( 0 ) */ /* #define dbgmsg(...) do { fprintf( stderr, __VA_ARGS__ ); fprintf( stderr, "\n" ); } while( 0 ) */
struct tr_web struct tr_web
{ {
unsigned int dying : 1; unsigned int closing : 1;
int prev_running; int prev_running;
int still_running; int still_running;
long timer_ms; long timer_ms;
@ -58,7 +54,6 @@ struct tr_web_task
tr_session * session; tr_session * session;
tr_web_done_func * done_func; tr_web_done_func * done_func;
void * done_func_user_data; void * done_func_user_data;
long timer_ms;
}; };
static size_t static size_t
@ -118,7 +113,6 @@ addTask( void * vtask )
TR_NAME "/" LONG_VERSION_STRING ); TR_NAME "/" LONG_VERSION_STRING );
curl_easy_setopt( easy, CURLOPT_SSL_VERIFYHOST, 0 ); curl_easy_setopt( easy, CURLOPT_SSL_VERIFYHOST, 0 );
curl_easy_setopt( easy, CURLOPT_SSL_VERIFYPEER, 0 ); curl_easy_setopt( easy, CURLOPT_SSL_VERIFYPEER, 0 );
//curl_easy_setopt( easy, CURLOPT_FORBID_REUSE, 1 );
curl_easy_setopt( easy, CURLOPT_NOSIGNAL, 1 ); curl_easy_setopt( easy, CURLOPT_NOSIGNAL, 1 );
curl_easy_setopt( easy, CURLOPT_FOLLOWLOCATION, 1 ); curl_easy_setopt( easy, CURLOPT_FOLLOWLOCATION, 1 );
curl_easy_setopt( easy, CURLOPT_MAXREDIRS, 5 ); curl_easy_setopt( easy, CURLOPT_MAXREDIRS, 5 );
@ -171,32 +165,19 @@ finish_task( struct tr_web_task * task, long response_code )
} }
static void static void
webDestroy( tr_web * web ) remove_finished_tasks( tr_web * g )
{ {
timeout_del( &web->timer_event );
curl_multi_cleanup( web->multi );
tr_free( web );
}
/* note: this function can free the tr_web if it's been flagged for deletion if( g->prev_running != g->still_running )
and there are no more tasks remaining. so, callers need to make sure to
not reference their g pointer after calling this function */
static void
check_run_count( tr_web * g )
{
int added_from_queue = FALSE;
dbgmsg( "check_run_count: prev_running %d, still_running %d",
g->prev_running, g->still_running );
if( 1 )
{ {
CURLMsg * msg;
int msgs_left;
CURL * easy; CURL * easy;
CURLcode res;
do{ do
{
CURLMsg * msg;
int msgs_left;
CURLcode res;
easy = NULL; easy = NULL;
while(( msg = curl_multi_info_read( g->multi, &msgs_left ))) { while(( msg = curl_multi_info_read( g->multi, &msgs_left ))) {
if( msg->msg == CURLMSG_DONE ) { if( msg->msg == CURLMSG_DONE ) {
@ -205,6 +186,7 @@ check_run_count( tr_web * g )
break; break;
} }
} }
if( easy ) { if( easy ) {
long code; long code;
struct tr_web_task * task; struct tr_web_task * task;
@ -214,13 +196,36 @@ check_run_count( tr_web * g )
curl_easy_cleanup( easy ); curl_easy_cleanup( easy );
finish_task( task, code ); finish_task( task, code );
} }
} while ( easy ); }
while ( easy );
g->prev_running = g->still_running;
} }
}
g->prev_running = g->still_running; static void
stop_timer( tr_web* g )
{
if( evtimer_pending( &g->timer_event, NULL ) )
{
dbgmsg( "deleting the pending global timer" );
evtimer_del( &g->timer_event );
}
}
dbgmsg( "--> still running: %d ... max: %d ... queue size: %d", g->still_running, MAX_CONCURRENT_TASKS, tr_list_size( g->easy_queue ) ); static void
restart_timer( tr_web * g )
{
struct timeval timeout;
stop_timer( g );
dbgmsg( "adding a timeout for %ld seconds from now", g->timer_ms/1000l );
tr_timevalMsec( g->timer_ms, &timeout );
timeout_add( &g->timer_event, &timeout );
}
static void
add_tasks_from_queue( tr_web * g )
{
while( ( g->still_running < MAX_CONCURRENT_TASKS ) while( ( g->still_running < MAX_CONCURRENT_TASKS )
&& ( tr_list_size( g->easy_queue ) > 0 ) ) && ( tr_list_size( g->easy_queue ) > 0 ) )
{ {
@ -228,56 +233,71 @@ dbgmsg( "--> still running: %d ... max: %d ... queue size: %d", g->still_running
if( easy ) if( easy )
{ {
const CURLMcode rc = curl_multi_add_handle( g->multi, easy ); const CURLMcode rc = curl_multi_add_handle( g->multi, easy );
if( rc != CURLM_OK ) { if( rc != CURLM_OK )
tr_err( "%s", curl_multi_strerror( rc ) ); tr_err( "%s", curl_multi_strerror( rc ) );
} else { else {
dbgmsg( "pumped a task out of the curl queue... %d remain", tr_list_size( g->easy_queue ) ); dbgmsg( "pumped a task out of the curl queue... %d remain", tr_list_size( g->easy_queue ) );
added_from_queue = TRUE;
++g->still_running; ++g->still_running;
} }
} }
} }
if( !added_from_queue )
{
if( g->still_running <= 0 ) {
if( evtimer_pending( &g->timer_event, NULL ) ) {
dbgmsg( "deleting the pending global timer" );
evtimer_del( &g->timer_event );
}
}
if( g->dying && ( g->still_running < 1 ) ) {
dbgmsg( "destroying the web global now that all the tasks are done" );
webDestroy( g );
}
}
} }
static void static void
reset_timer( tr_web * g ) webDestroy( tr_web * web )
{ {
struct timeval timeout; stop_timer( web );
curl_multi_cleanup( web->multi );
tr_free( web );
}
if( evtimer_pending( &g->timer_event, NULL ) ) /* note: this function can free the tr_web if it's been flagged for deletion
evtimer_del( &g->timer_event ); and there are no more tasks remaining. so, callers need to make sure to
not reference their g pointer after calling this function */
static void
tr_multi_socket_action( tr_web * g, int fd, int mask )
{
int closed = FALSE;
CURLMcode rc;
dbgmsg( "adding a timeout for %ld seconds from now", g->timer_ms/1000l ); dbgmsg( "check_run_count: prev_running %d, still_running %d",
tr_timevalMsec( g->timer_ms, &timeout ); g->prev_running, g->still_running );
timeout_add( &g->timer_event, &timeout );
/* invoke libcurl's processing */
do {
dbgmsg( "event_cb calling socket_action fd %d, mask %d", fd, mask );
rc = curl_multi_socket_action( g->multi, fd, mask, &g->still_running );
dbgmsg( "event_cb(): still_running is %d", g->still_running );
} while( rc == CURLM_CALL_MULTI_PERFORM );
if( rc != CURLM_OK )
tr_err( "%s", curl_multi_strerror( rc ) );
remove_finished_tasks( g );
add_tasks_from_queue( g );
if( !g->still_running ) {
stop_timer( g );
if( g->closing ) {
webDestroy( g );
closed = TRUE;
}
}
if( !closed )
restart_timer( g );
} }
/* libevent says that sock is ready to be processed, so wake up libcurl */ /* libevent says that sock is ready to be processed, so wake up libcurl */
static void static void
event_cb( int fd, short kind, void * vg ) event_cb( int fd, short kind, void * g )
{ {
tr_web * g = vg; int error;
CURLMcode rc;
int error = 0;
int mask; int mask;
socklen_t errsz = sizeof( error ); socklen_t errsz;
error = 0;
errsz = sizeof( error );
getsockopt( fd, SOL_SOCKET, SO_ERROR, &error, &errsz ); getsockopt( fd, SOL_SOCKET, SO_ERROR, &error, &errsz );
if( error ) if( error )
mask = CURL_CSELECT_ERR; mask = CURL_CSELECT_ERR;
@ -287,38 +307,15 @@ event_cb( int fd, short kind, void * vg )
if( kind & EV_WRITE ) mask |= CURL_CSELECT_OUT; if( kind & EV_WRITE ) mask |= CURL_CSELECT_OUT;
} }
do { tr_multi_socket_action( g, fd, mask );
dbgmsg( "event_cb calling socket_action fd %d, mask %d", fd, mask );
rc = curl_multi_socket_action( g->multi, fd, mask, &g->still_running );
dbgmsg( "event_cb(): still_running is %d", g->still_running );
} while( rc == CURLM_CALL_MULTI_PERFORM );
if( rc != CURLM_OK )
tr_err( "%s", curl_multi_strerror( rc ) );
reset_timer( g );
check_run_count( g );
} }
/* libevent says that timer_ms have passed, so wake up libcurl */ /* libevent says that timer_ms have passed, so wake up libcurl */
static void static void
timer_cb( int socket UNUSED, short action UNUSED, void * vg ) timer_cb( int socket UNUSED, short action UNUSED, void * g )
{ {
tr_web * g = vg;
CURLMcode rc;
dbgmsg( "libevent timer is done" ); dbgmsg( "libevent timer is done" );
tr_multi_socket_action( g, CURL_SOCKET_TIMEOUT, 0 );
do {
dbgmsg( "timer_cb calling CURL_SOCKET_TIMEOUT" );
rc = curl_multi_socket_action( g->multi, CURL_SOCKET_TIMEOUT, 0,
&g->still_running );
dbgmsg( "timer_cb(): still_running is %d", g->still_running );
} while( rc == CURLM_CALL_MULTI_PERFORM );
if( rc != CURLM_OK )
tr_err( "%s", curl_multi_strerror( rc ) );
reset_timer( g );
check_run_count( g );
} }
static void static void
@ -339,8 +336,8 @@ setsock( curl_socket_t sockfd,
struct tr_web_sockinfo * f ) struct tr_web_sockinfo * f )
{ {
const int kind = EV_PERSIST const int kind = EV_PERSIST
| (action & CURL_POLL_IN ? EV_READ : 0) | (( action & CURL_POLL_IN ) ? EV_READ : 0 )
| (action & CURL_POLL_OUT ? EV_WRITE : 0); | (( action & CURL_POLL_OUT ) ? EV_WRITE : 0 );
dbgmsg( "setsock: fd is %d, curl action is %d, libevent action is %d", sockfd, action, kind ); dbgmsg( "setsock: fd is %d, curl action is %d, libevent action is %d", sockfd, action, kind );
if( f->evset ) if( f->evset )
event_del( &f->ev ); event_del( &f->ev );
@ -383,13 +380,10 @@ sock_cb( CURL * e UNUSED,
} }
/* from libcurl documentation: "The timeout value returned in the long timeout /* from libcurl documentation: "If 0, it means you should proceed immediately
points to, is in number of milliseconds at this very moment. If 0, it means * without waiting for anything. If it returns -1, there's no timeout at all
you should proceed immediately without waiting for anything. If it * set ... (but) you must not wait too long (more than a few seconds perhaps)
returns -1, there's no timeout at all set. Note: if libcurl returns a -1 * before you call curl_multi_perform() again." */
timeout here, it just means that libcurl currently has no stored timeout
value. You must not wait too long (more than a few seconds perhaps) before
you call curl_multi_perform() again." */
static void static void
multi_timer_cb( CURLM *multi UNUSED, long timer_ms, void * g ) multi_timer_cb( CURLM *multi UNUSED, long timer_ms, void * g )
{ {
@ -398,7 +392,7 @@ multi_timer_cb( CURLM *multi UNUSED, long timer_ms, void * g )
timer_cb( 0, 0, g ); timer_cb( 0, 0, g );
timer_ms = DEFAULT_TIMER_MSEC; timer_ms = DEFAULT_TIMER_MSEC;
} }
reset_timer( g ); restart_timer( g );
} }
/**** /****
@ -448,7 +442,7 @@ tr_webInit( tr_session * session )
web = tr_new0( struct tr_web, 1 ); web = tr_new0( struct tr_web, 1 );
web->multi = curl_multi_init( ); web->multi = curl_multi_init( );
web->session = session; web->session = session;
web->timer_ms = DEFAULT_TIMER_MSEC; /* default value to overwrite in multi_timer_cb() */ web->timer_ms = DEFAULT_TIMER_MSEC; /* overwritten by multi_timer_cb() */
timeout_set( &web->timer_event, timer_cb, web ); timeout_set( &web->timer_event, timer_cb, web );
curl_multi_setopt( web->multi, CURLMOPT_SOCKETDATA, web ); curl_multi_setopt( web->multi, CURLMOPT_SOCKETDATA, web );
@ -467,7 +461,7 @@ tr_webClose( tr_web ** web_in )
if( web->still_running < 1 ) if( web->still_running < 1 )
webDestroy( web ); webDestroy( web );
else else
web->dying = 1; web->closing = 1;
} }
/***** /*****