1
0
Fork 0
mirror of https://github.com/transmission/transmission synced 2024-12-24 08:43:27 +00:00

do all the libevent enqueuing from a single thread.

This commit is contained in:
Charles Kerr 2007-08-18 03:02:32 +00:00
parent 9f78eda964
commit 35b727788e
8 changed files with 271 additions and 38 deletions

View file

@ -36,6 +36,7 @@ libtransmission_a_SOURCES = \
torrent.c \
tracker.c \
transmission.c \
trevent.c \
upnp.c \
utils.c \
xml.c
@ -75,6 +76,7 @@ noinst_HEADERS = \
tracker.h \
transmission.h \
trcompat.h \
trevent.h \
upnp.h \
utils.h \
xml.h

View file

@ -172,6 +172,8 @@ struct tr_torrent_s
struct tr_handle_s
{
struct tr_event_handle_s * events;
int torrentCount;
tr_torrent_t * torrentList;

View file

@ -14,6 +14,7 @@
#include <event.h>
#include "transmission.h"
#include "trevent.h"
#include "timer.h"
#include "utils.h"
@ -80,7 +81,8 @@ timerCB( int fd UNUSED, short event UNUSED, void * arg )
tr_timer_tag
tr_timerNew( tr_timer_func func,
tr_timerNew( tr_handle_t * handle,
tr_timer_func func,
void * user_data,
tr_data_free_func free_func,
int timeout_milliseconds )
@ -99,6 +101,6 @@ tr_timerNew( tr_timer_func func,
node->tv.tv_sec = microseconds / 1000000;
node->tv.tv_usec = microseconds % 1000000;
timeout_set( &node->event, timerCB, node );
timeout_add( &node->event, &node->tv );
tr_event_add( handle, &node->event, &node->tv );
return node;
}

View file

@ -23,10 +23,11 @@ typedef struct timer_node * tr_timer_tag;
* zero or from a client call to tr_timerFree). This is useful
* if user_data has resources that need to be freed.
*/
tr_timer_tag tr_timerNew( int timer_func( void * user_data ),
void * user_data,
void free_func( void * user_data ),
int timeout_milliseconds );
tr_timer_tag tr_timerNew( struct tr_handle_s * handle,
int timer_func( void * user_data ),
void * user_data,
void free_func( void * user_data ),
int timeout_milliseconds );
/**
* Frees a timer and sets its tag to NULL.

View file

@ -28,6 +28,7 @@
#include "publish.h"
#include "timer.h"
#include "tracker.h"
#include "trevent.h"
#include "utils.h"
#define MINUTES_TO_MSEC(N) ((N) * 60 * 1000)
@ -60,6 +61,8 @@
typedef struct
{
tr_handle_t * handle;
tr_ptrArray_t * torrents;
tr_ptrArray_t * scraping;
tr_ptrArray_t * scrapeQueue;
@ -228,12 +231,13 @@ tr_trackerScrapeSoon( Tracker * t )
return;
if( !t->scrapeTag )
t->scrapeTag = tr_timerNew( onTrackerScrapeNow, t, NULL, 1000 );
t->scrapeTag = tr_timerNew( t->handle, onTrackerScrapeNow, t, NULL, 1000 );
}
static Tracker*
tr_trackerGet( const tr_info_t * info )
tr_trackerGet( const tr_torrent_t * tor )
{
const tr_info_t * info = &tor->info;
tr_ptrArray_t * trackers = getTrackerLookupTable( );
Tracker *t, tmp;
assert( info != NULL );
@ -250,6 +254,7 @@ tr_trackerGet( const tr_info_t * info )
tr_dbg( "making a new tracker for \"%s\"", info->primaryAddress );
t = tr_new0( Tracker, 1 );
t->handle = tor->handle;
t->primaryAddress = tr_strdup( info->primaryAddress );
t->scrapeIntervalMsec = DEFAULT_SCRAPE_INTERVAL_MSEC;
t->announceIntervalMsec = DEFAULT_ANNOUNCE_INTERVAL_MSEC;
@ -353,7 +358,7 @@ Torrent*
tr_trackerNew( tr_torrent_t * torrent )
{
Torrent * tor;
Tracker * t = tr_trackerGet( &torrent->info );
Tracker * t = tr_trackerGet( torrent );
assert( getExistingTorrent( t, torrent->info.hash ) == NULL );
/* create a new Torrent and queue it for scraping */
@ -396,6 +401,8 @@ updateAddresses( Tracker * t, const struct evhttp_request * req )
if( !req )
{
tr_inf( "Connecting to %s got a NULL response",
t->addresses[t->addressIndex].announce );
moveToNextAddress = TRUE;
}
else if( req->response_code == HTTP_OK )
@ -548,7 +555,8 @@ onScrapeResponse( struct evhttp_request * req, void * vt )
tr_ptrArrayRemoveSorted( t->scraping, tor, torrentCompare );
assert( !tor->scrapeTag );
tor->scrapeTag = tr_timerNew( onTorrentScrapeNow,
tor->scrapeTag = tr_timerNew( t->handle,
onTorrentScrapeNow,
tor, NULL,
t->scrapeIntervalMsec );
tr_dbg( "torrent '%s' scraped. re-scraping in %d seconds",
@ -656,9 +664,7 @@ onTrackerScrapeNow( void * vt )
req = evhttp_request_new( onScrapeResponse, t );
assert( req );
addCommonHeaders( t, req );
evhttp_make_request( evcon, req, EVHTTP_REQ_GET, uri );
tr_free( uri );
tr_evhttp_make_request( t->handle, evcon, req, EVHTTP_REQ_GET, uri );
}
return FALSE;
@ -868,7 +874,8 @@ onTrackerResponse( struct evhttp_request * req, void * vtor )
else if( reannounceInterval > 0 ) {
tr_inf( "torrent '%s' reannouncing in %d seconds",
tor->torrent->info.name, (reannounceInterval/1000) );
tor->reannounceTag = tr_timerNew( onReannounceNow, tor, NULL,
tor->reannounceTag = tr_timerNew( tor->tracker->handle,
onReannounceNow, tor, NULL,
reannounceInterval );
tor->manualAnnounceAllowedAt
= tr_date() + MANUAL_ANNOUNCE_INTERVAL_MSEC;
@ -898,7 +905,7 @@ sendTrackerRequest( void * vtor, const char * eventName )
evhttp_connection_set_timeout( evcon, REQ_TIMEOUT_INTERVAL_SEC );
tor->httpReq = evhttp_request_new( onTrackerResponse, tor );
addCommonHeaders( tor->tracker, tor->httpReq );
evhttp_make_request( evcon, tor->httpReq, EVHTTP_REQ_GET, uri );
tr_evhttp_make_request( tor->tracker->handle, evcon, tor->httpReq, EVHTTP_REQ_GET, uri );
}
tr_free( uri );

View file

@ -33,8 +33,6 @@
#include <unistd.h> /* stat */
#include <dirent.h> /* opendir */
#include <event.h>
#include "transmission.h"
#include "fdlimit.h"
#include "list.h"
@ -42,6 +40,7 @@
#include "platform.h"
#include "ratecontrol.h"
#include "shared.h"
#include "trevent.h"
#include "utils.h"
/* Generate a peer id : "-TRxyzb-" + 12 random alphanumeric
@ -63,20 +62,9 @@ tr_peerIdNew ( char * buf, int buflen )
buf[TR_ID_LEN] = '\0';
}
static int shuttingDown = FALSE;
static void
libeventThreadFunc( void * unused UNUSED )
{
tr_dbg( "libevent thread starting" );
event_init( );
while( !shuttingDown )
{
event_dispatch( );
tr_wait( 50 ); /* 1/20th of a second */
}
tr_dbg( "libevent thread exiting" );
}
/***
****
***/
/***********************************************************************
@ -90,14 +78,15 @@ tr_handle_t * tr_init( const char * tag )
int i;
tr_msgInit();
tr_threadNew( libeventThreadFunc, NULL, "libeventThreadFunc" );
tr_netInit();
tr_netResolveThreadInit();
h = calloc( 1, sizeof( tr_handle_t ) );
h = tr_new0( tr_handle_t, 1 );
if( !h )
return NULL;
tr_eventInit( h );
tr_netInit();
tr_netResolveThreadInit();
h->tag = strdup( tag );
if( !h->tag ) {
free( h );
@ -248,9 +237,7 @@ void tr_close( tr_handle_t * h )
free( h );
tr_netResolveThreadClose();
/* end the event thread */
shuttingDown = TRUE;
tr_eventClose( h );
}
tr_torrent_t **

194
libtransmission/trevent.c Normal file
View file

@ -0,0 +1,194 @@
/*
* This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
*
* This file is licensed by the GPL version 2. Works owned by the
* Transmission project are granted a special exemption to clause 2(b)
* so that the bulk of its code can remain under the MIT license.
* This exemption does not extend to derived works not owned by
* the Transmission project.
*/
#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/queue.h> /* for evhttp */
#include <sys/types.h> /* for evhttp */
#ifdef WIN32
#include <fcntl.h>
#define pipe(f) _pipe(f, 1000, _O_BINARY)
#else
#include <unistd.h>
#endif
#include <event.h>
#include <evhttp.h>
#include "transmission.h"
#include "platform.h"
#include "utils.h"
//#define DEBUG
#ifdef DEBUG
#undef tr_dbg
#define tr_dbg( a, b... ) fprintf(stderr, a "\n", ##b )
#endif
/***
****
***/
typedef struct tr_event_handle_s
{
int fds[2];
int isShuttingDown;
tr_lock_t * lock;
tr_handle_t * h;
struct event pipeEvent;
}
tr_event_handle_t;
#ifdef DEBUG
static int reads = 0;
static int writes = 0;
#endif
void
readFromPipe( int fd, short eventType UNUSED, void * unused UNUSED )
{
char ch;
int ret;
struct event * event;
struct timeval interval;
struct evhttp_connection * evcon;
struct evhttp_request * req;
enum evhttp_cmd_type type;
const char * uri;
static char * buf = NULL;
#ifdef DEBUG
fprintf( stderr, "reading...reads: [%d] writes: [%d]\n", ++reads, writes );
#endif
if( !buf )
buf = tr_new0( char, sizeof(void*) );
ch = '\0';
do {
ret = read( fd, &ch, 1 );
} while ( ret<0 && errno==EAGAIN );
if( ret < 0 )
{
tr_err( "Couldn't read from libevent pipe: %s", strerror(errno) );
}
else switch( ch )
{
case 'e': /* event_add */
event = tr_new0( struct event, 1 );
read( fd, event, sizeof(struct event) );
read( fd, &interval, sizeof(struct timeval) );
tr_dbg( "read event from pipe: event.ev_arg is %p", event->ev_arg );
event_add( event, &interval );
break;
case 'h': /* http_make_request */
ret = read( fd, &evcon, sizeof(struct evhttp_connection*) );
read( fd, &req, sizeof(struct evhttp_request*) );
read( fd, &type, sizeof(enum evhttp_cmd_type) );
read( fd, &uri, sizeof(char*) );
tr_dbg( "read http req from pipe: req.cb_arg is %p", req->cb_arg );
evhttp_make_request( evcon, req, type, uri );
break;
default:
assert( 0 && "unhandled event pipe condition!" );
}
}
static void
libeventThreadFunc( void * veh )
{
tr_event_handle_t * eh = (tr_event_handle_t *) veh;
tr_dbg( "libevent thread starting" );
event_init( );
/* listen to the pipe's read fd */
event_set( &eh->pipeEvent, eh->fds[0], EV_READ|EV_PERSIST, readFromPipe, NULL );
event_add( &eh->pipeEvent, NULL );
while( !eh->isShuttingDown ) {
event_dispatch( );
tr_wait( 50 ); /* 1/20th of a second */
}
tr_dbg( "libevent thread exiting" );
}
void
tr_eventInit( tr_handle_t * handle )
{
tr_event_handle_t * eh;
eh = tr_new0( tr_event_handle_t, 1 );
eh->lock = tr_lockNew( );
pipe( eh->fds );
eh->h = handle;
handle->events = eh;
tr_threadNew( libeventThreadFunc, eh, "libeventThreadFunc" );
}
void
tr_eventClose( tr_handle_t * handle UNUSED )
{
fprintf( stderr, "%s:%d FIXME\n", __FILE__, __LINE__ );
}
void
tr_event_add (tr_handle_t * handle,
struct event * event,
struct timeval * interval )
{
const char ch = 'e';
int fd = handle->events->fds[1];
tr_lock_t * lock = handle->events->lock;
tr_lockLock( lock );
tr_dbg( "writing event to pipe: event.ev_arg is %p", event->ev_arg );
#ifdef DEBUG
fprintf( stderr, "reads: [%d] writes: [%d]\n", reads, ++writes );
#endif
write( fd, &ch, 1 );
write( fd, event, sizeof(struct event) );
write( fd, interval, sizeof(struct timeval) );
tr_lockUnlock( lock );
}
void
tr_evhttp_make_request (tr_handle_t * handle,
struct evhttp_connection * evcon,
struct evhttp_request * req,
enum evhttp_cmd_type type,
const char * uri)
{
const char ch = 'h';
int fd = handle->events->fds[1];
tr_lock_t * lock = handle->events->lock;
tr_lockLock( lock );
tr_dbg( "writing HTTP req to pipe: req.cb_arg is %p", req->cb_arg );
#ifdef DEBUG
fprintf( stderr, "reads: [%d] writes: [%d]\n", reads, ++writes );
#endif
write( fd, &ch, 1 );
write( fd, &evcon, sizeof(struct evhttp_connection*) );
write( fd, &req, sizeof(struct evhttp_request*) );
write( fd, &type, sizeof(enum evhttp_cmd_type) );
write( fd, &uri, sizeof(char*) );
tr_lockUnlock( lock );
}

38
libtransmission/trevent.h Normal file
View file

@ -0,0 +1,38 @@
/*
* This file Copyright (C) 2007 Charles Kerr <charles@rebelbase.com>
*
* This file is licensed by the GPL version 2. Works owned by the
* Transmission project are granted a special exemption to clause 2(b)
* so that the bulk of its code can remain under the MIT license.
* This exemption does not extend to derived works not owned by
* the Transmission project.
*/
#ifndef TR_EVENT_H
/**
**/
extern void tr_eventInit( struct tr_handle_s * tr_handle );
extern void tr_eventClose( struct tr_handle_s * tr_handle );
/**
**/
struct event;
enum evhttp_cmd_type;
struct evhttp_request;
struct evhttp_connection;
extern void tr_event_add( struct tr_handle_s * tr_handle,
struct event * event,
struct timeval * interval );
extern void tr_evhttp_make_request (struct tr_handle_s * tr_handle,
struct evhttp_connection * evcon,
struct evhttp_request * req,
enum evhttp_cmd_type type,
const char * uri);
#endif