(trunk libT) experimental fix for #1748: crash in event_queue_insert in thread1

This commit is contained in:
Charles Kerr 2009-02-05 22:00:21 +00:00
parent 46ff5598a7
commit 8572d81e00
1 changed files with 97 additions and 112 deletions

View File

@ -49,6 +49,13 @@ enum
} while( 0 )
#endif
struct tr_web_sockinfo
{
int fd;
tr_bool evset;
struct event ev;
};
struct tr_web
{
tr_bool closing;
@ -57,16 +64,67 @@ struct tr_web
long timer_ms;
CURLM * multi;
tr_session * session;
#if 0
tr_list * easy_queue;
#endif
struct event timer_event;
tr_list * fds;
};
/***
****
***/
static struct tr_web_sockinfo *
getSockinfo( tr_web * web, int fd, tr_bool createIfMissing )
{
tr_list * l = web->fds;
for( l=web->fds; l!=NULL; l=l->next ) {
struct tr_web_sockinfo * s = l->data;
if( s->fd == fd ) {
dbgmsg( "looked up sockinfo %p for fd %d", s, fd );
return s;
}
}
if( createIfMissing ) {
struct tr_web_sockinfo * s = tr_new0( struct tr_web_sockinfo, 1 );
s->fd = fd;
tr_list_prepend( &web->fds, s );
dbgmsg( "created sockinfo %p for fd %d... we now have %d sockinfos", s, fd, tr_list_size(web->fds) );
return s;
}
return NULL;
}
static void
clearSockinfoEvent( struct tr_web_sockinfo * s )
{
if( s && s->evset )
{
dbgmsg( "clearing libevent polling for sockinfo %p, fd %d", s, s->fd );
event_del( &s->ev );
s->evset = FALSE;
}
}
static void
purgeSockinfo( tr_web * web, int fd )
{
struct tr_web_sockinfo * s = getSockinfo( web, fd, FALSE );
if( s != NULL )
{
tr_list_remove_data( &web->fds, s );
clearSockinfoEvent( s );
dbgmsg( "freeing sockinfo %p, fd %d", s, s->fd );
tr_free( s );
}
}
/***
****
***/
struct tr_web_task
{
unsigned long tag;
@ -148,15 +206,6 @@ addTask( void * vtask )
else /* don't set encoding on webseeds; it messes up binary data */
curl_easy_setopt( easy, CURLOPT_ENCODING, "" );
#if 0
if( web->still_running >= MAX_CONCURRENT_TASKS )
{
tr_list_append( &web->easy_queue, easy );
dbgmsg( " >> enqueueing a task ... size is now %d",
tr_list_size( web->easy_queue ) );
}
else
#endif
{
const CURLMcode mcode = curl_multi_add_handle( web->multi, easy );
tr_assert( mcode == CURLM_OK, "curl_multi_add_handle() failed: %d (%s)", mcode, curl_multi_strerror( mcode ) );
@ -172,12 +221,6 @@ addTask( void * vtask )
****
***/
struct tr_web_sockinfo
{
struct event ev;
int evset;
};
static void
task_free( struct tr_web_task * task )
{
@ -263,29 +306,6 @@ restart_timer( tr_web * g )
evtimer_add( &g->timer_event, &interval );
}
#if 0
static void
add_tasks_from_queue( tr_web * g )
{
while( ( g->still_running < MAX_CONCURRENT_TASKS )
&& ( tr_list_size( g->easy_queue ) > 0 ) )
{
CURL * easy = tr_list_pop_front( &g->easy_queue );
if( easy )
{
const CURLMcode rc = curl_multi_add_handle( g->multi, easy );
if( rc != CURLM_OK )
tr_err( "%s", curl_multi_strerror( rc ) );
else {
dbgmsg( "pumped the task queue, %d remain",
tr_list_size( g->easy_queue ) );
++g->still_running;
}
}
}
}
#endif
static void
web_close( tr_web * g )
{
@ -318,17 +338,18 @@ tr_multi_socket_action( tr_web * g, int fd )
mcode = curl_multi_socket_action( g->multi, fd, 0, &g->still_running );
dbgmsg( "event_cb(): fd %d, still_running is %d", fd, g->still_running );
} while( mcode == CURLM_CALL_MULTI_PERFORM );
tr_assert( mcode == CURLM_OK, "curl_multi_socket_action() failed on fd %d: %d (%s)", fd, mcode, curl_multi_strerror( mcode ) );
if( mcode != CURLM_OK )
tr_err( "%s", curl_multi_strerror( mcode ) );
if( ( mcode == CURLM_BAD_SOCKET ) && ( fd != CURL_SOCKET_TIMEOUT ) )
purgeSockinfo( g, fd );
else {
tr_assert( mcode == CURLM_OK, "curl_multi_socket_action() failed on fd %d: %d (%s)", fd, mcode, curl_multi_strerror( mcode ) );
if( mcode != CURLM_OK )
tr_err( "%s", curl_multi_strerror( mcode ) );
}
remove_finished_tasks( g );
#if 0
add_tasks_from_queue( g );
#endif
if( !g->still_running ) {
assert( tr_list_size( g->fds ) == 0 );
stop_timer( g );
if( g->closing ) {
web_close( g );
@ -355,76 +376,40 @@ timer_cb( int socket UNUSED, short action UNUSED, void * g )
tr_multi_socket_action( g, CURL_SOCKET_TIMEOUT );
}
static void
remsock( struct tr_web_sockinfo * f )
{
if( f ) {
dbgmsg( "deleting sockinfo %p", f );
if( f->evset )
event_del( &f->ev );
tr_free( f );
}
}
static void
setsock( curl_socket_t sockfd,
int action,
struct tr_web * g,
struct tr_web_sockinfo * f )
{
const int kind = EV_PERSIST
| (( action & CURL_POLL_IN ) ? EV_READ : 0 )
| (( action & CURL_POLL_OUT ) ? EV_WRITE : 0 );
assert( tr_amInEventThread( g->session ) );
assert( g->session != NULL );
assert( g->session->events != NULL );
dbgmsg( "setsock: fd is %d, curl action is %d, libevent action is %d",
sockfd, action, kind );
if( f->evset )
event_del( &f->ev );
event_set( &f->ev, sockfd, kind, event_cb, g );
f->evset = 1;
event_add( &f->ev, NULL );
}
static void
addsock( curl_socket_t sockfd,
int action,
struct tr_web * g )
{
CURLMcode mcode;
struct tr_web_sockinfo * f;
f = tr_new0( struct tr_web_sockinfo, 1 );
dbgmsg( "creating a sockinfo %p for fd %d", f, sockfd );
setsock( sockfd, action, g, f );
mcode = curl_multi_assign( g->multi, sockfd, f );
tr_assert( mcode == CURLM_OK, "curl_multi_assign() failed: %d (%s)", mcode, curl_multi_strerror( mcode ) );
if( mcode != CURLM_OK )
tr_err( "%s", curl_multi_strerror( mcode ) );
}
/* CURLMOPT_SOCKETFUNCTION */
static int
sock_cb( CURL * e UNUSED,
curl_socket_t s,
int what,
void * vg,
void * vf)
curl_socket_t fd,
int action,
void * vweb,
void * unused UNUSED)
{
struct tr_web * g = vg;
struct tr_web_sockinfo * f = vf;
dbgmsg( "sock_cb: what is %d, sockinfo is %p", what, f );
struct tr_web * web = vweb;
dbgmsg( "sock_cb: action is %d, fd is %d", action, (int)fd );
if( what == CURL_POLL_REMOVE )
remsock( f );
else if( !f )
addsock( s, what, g );
if( action == CURL_POLL_REMOVE )
{
purgeSockinfo( web, fd );
}
else
setsock( s, what, g, f );
{
struct tr_web_sockinfo * sockinfo = getSockinfo( web, fd, TRUE );
const int kind = EV_PERSIST
| (( action & CURL_POLL_IN ) ? EV_READ : 0 )
| (( action & CURL_POLL_OUT ) ? EV_WRITE : 0 );
dbgmsg( "setsock: fd is %d, curl action is %d, libevent action is %d", fd, action, kind );
assert( tr_amInEventThread( web->session ) );
assert( kind != EV_PERSIST );
/* clear any old polling on this fd */
clearSockinfoEvent( sockinfo );
/* set the new polling on this fd */
dbgmsg( "enabling (libevent %d, libcurl %d) polling on sockinfo %p, fd %d", action, kind, sockinfo, fd );
event_set( &sockinfo->ev, fd, kind, event_cb, web );
event_add( &sockinfo->ev, NULL );
sockinfo->evset = TRUE;
}
return 0;
}