/* * This file Copyright (C) 2007-2008 Charles Kerr * * This file is licensed by the GPL version 2. Works owned by the * Transmission project are granted a special exemption to clause 2(b) * so that the bulk of its code can remain under the MIT license. * This exemption does not extend to derived works not owned by * the Transmission project. * * $Id$ */ #include #include #include #include #include #include #include #include "transmission.h" #include "list.h" #include "platform.h" #include "trevent.h" #include "utils.h" /* #define DEBUG */ #ifdef DEBUG #include #undef tr_dbg #define tr_dbg( a, b... ) fprintf(stderr, a "\n", ##b ) #endif /*** **** ***/ typedef struct tr_event_handle { tr_lock * lock; tr_handle * h; tr_thread * thread; tr_list * commands; struct event_base * base; struct event pulse; struct timeval pulseInterval; uint8_t die; int timerCount; } tr_event_handle; #ifdef DEBUG static int reads = 0; static int writes = 0; #endif enum mode { TR_EV_TIMER_ADD, TR_EV_EXEC }; typedef int timer_func(void*); struct tr_timer { struct event event; struct timeval tv; timer_func * func; void * user_data; struct tr_event_handle * eh; uint8_t inCallback; }; struct tr_event_command { int mode; struct tr_timer * timer; struct evhttp_connection * evcon; struct evhttp_request * req; enum evhttp_cmd_type evtype; char * uri; struct bufferevent * bufev; short enable; short disable; char * buf; size_t buflen; void (*func)( void* ); void * user_data; }; static void pumpList( int i UNUSED, short s UNUSED, void * veh ) { tr_event_handle * eh = veh; int doDie; for( ;; ) { struct tr_event_command * cmd; doDie = eh->die && !eh->timerCount; if( doDie ) break; /* get the next command */ tr_lockLock( eh->lock ); cmd = tr_list_pop_front( &eh->commands ); tr_lockUnlock( eh->lock ); if( cmd == NULL ) break; /* process the command */ switch( cmd->mode ) { case TR_EV_TIMER_ADD: evtimer_add( &cmd->timer->event, &cmd->timer->tv ); ++eh->timerCount; break; case TR_EV_EXEC: (cmd->func)( cmd->user_data ); break; default: assert( 0 && "unhandled command type!" ); } /* cleanup */ tr_free( cmd ); } if( !doDie ) evtimer_add( &eh->pulse, &eh->pulseInterval ); else { assert( eh->timerCount == 0 ); event_del( &eh->pulse ); } } static void logFunc( int severity, const char * message ) { if( severity >= _EVENT_LOG_ERR ) tr_nerr( "%s", message ); else tr_ndbg( "%s", message ); } static void libeventThreadFunc( void * veh ) { tr_event_handle * eh = (tr_event_handle *) veh; tr_dbg( "Starting libevent thread" ); #ifndef WIN32 /* Don't exit when writing on a broken socket */ signal( SIGPIPE, SIG_IGN ); #endif eh->base = event_init( ); event_set_log_callback( logFunc ); evtimer_set( &eh->pulse, pumpList, veh ); evtimer_add( &eh->pulse, &eh->pulseInterval ); eh->h->events = eh; event_dispatch( ); tr_lockFree( eh->lock ); event_base_free( eh->base ); eh->h->events = NULL; tr_free( eh ); tr_dbg( "Closing libevent thread" ); } void tr_eventInit( tr_handle * handle ) { tr_event_handle * eh; eh = tr_new0( tr_event_handle, 1 ); eh->lock = tr_lockNew( ); eh->h = handle; eh->pulseInterval = tr_timevalMsec( 100 ); eh->thread = tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" ); } void tr_eventClose( tr_handle * handle ) { tr_event_handle * eh = handle->events; tr_lockLock( eh->lock ); tr_list_free( &eh->commands, tr_free ); eh->die = TRUE; tr_lockUnlock( eh->lock ); } /** *** **/ static void pushList( struct tr_event_handle * eh, struct tr_event_command * command ) { tr_lockLock( eh->lock ); tr_list_append( &eh->commands, command ); tr_lockUnlock( eh->lock ); } int tr_amInEventThread( struct tr_handle * handle ) { return tr_amInThread( handle->events->thread ); } /** *** **/ static int timerCompareFunc( const void * va, const void * vb ) { const struct tr_event_command * a = va; const struct tr_timer * b = vb; return a->timer == b ? 0 : 1; } static void timerCallback( int fd UNUSED, short event UNUSED, void * vtimer ) { int more; struct tr_timer * timer = vtimer; void * del; del = tr_list_remove( &timer->eh->commands, timer, timerCompareFunc ); if( del != NULL ) /* there's a TIMER_DEL command queued for this timer... */ more = FALSE; else { timer->inCallback = 1; more = (*timer->func)( timer->user_data ); timer->inCallback = 0; } if( more ) evtimer_add( &timer->event, &timer->tv ); else tr_timerFree( &timer ); tr_free( del ); } void tr_timerFree( tr_timer ** ptimer ) { tr_timer * timer; /* zero out the argument passed in */ assert( ptimer ); timer = *ptimer; *ptimer = NULL; /* destroy the timer directly or via the command queue */ if( timer!=NULL && !timer->inCallback ) { void * del; assert( tr_amInEventThread( timer->eh->h ) ); del = tr_list_remove( &timer->eh->commands, timer, timerCompareFunc ); --timer->eh->timerCount; event_del( &timer->event ); tr_free( timer ); tr_free( del ); } } tr_timer* tr_timerNew( struct tr_handle * handle, timer_func func, void * user_data, uint64_t interval_milliseconds ) { tr_timer * timer = tr_new0( tr_timer, 1 ); timer->tv = tr_timevalMsec( interval_milliseconds ); timer->func = func; timer->user_data = user_data; timer->eh = handle->events; evtimer_set( &timer->event, timerCallback, timer ); if( tr_amInThread( handle->events->thread ) ) { evtimer_add( &timer->event, &timer->tv ); ++handle->events->timerCount; } else { struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 ); cmd->mode = TR_EV_TIMER_ADD; cmd->timer = timer; pushList( handle->events, cmd ); } return timer; } void tr_runInEventThread( struct tr_handle * handle, void func( void* ), void * user_data ) { if( tr_amInThread( handle->events->thread ) ) (func)( user_data ); else { struct tr_event_command * cmd = tr_new0( struct tr_event_command, 1 ); cmd->mode = TR_EV_EXEC; cmd->func = func; cmd->user_data = user_data; pushList( handle->events, cmd ); } }