(trunk libT) #1748: possible fix for the kqueue corruption errors by consolidating the three per-torrent libevent timers into three session-wide timers. Since most people reporting this error have lots of torrents loaded, consider a hypothetical example: if you had 500 torrents, this patch will reduce 1,500 libevent timers down to just three timers. On top of that, those three have simpler life cycles too...

This commit is contained in:
Charles Kerr 2009-02-04 16:58:52 +00:00
parent 606fee54ab
commit 08289b9d13
8 changed files with 140 additions and 189 deletions

View File

@ -47,8 +47,7 @@ typedef enum
TR_PEER_PEER_PROGRESS,
TR_PEER_ERROR,
TR_PEER_CANCEL,
TR_PEER_UPLOAD_ONLY,
TR_PEER_NEED_REQ
TR_PEER_UPLOAD_ONLY
}
PeerEventType;

View File

@ -1,4 +1,3 @@
/*
* This file Copyright (C) 2007-2009 Charles Kerr <charles@transmissionbt.com>
*
@ -47,7 +46,13 @@ enum
RECHOKE_PERIOD_MSEC = ( 10 * 1000 ),
/* minimum interval for refilling peers' request lists */
REFILL_PERIOD_MSEC = 333,
REFILL_PERIOD_MSEC = 400,
/* how frequently to reallocate bandwidth */
BANDWIDTH_PERIOD_MSEC = 500,
/* how frequently to decide which peers live and die */
RECONNECT_PERIOD_MSEC = 500,
/* when many peers are available, keep idle ones this long */
MIN_UPLOAD_IDLE_SECS = ( 30 ),
@ -55,12 +60,6 @@ enum
/* when few peers are available, keep idle ones this long */
MAX_UPLOAD_IDLE_SECS = ( 60 * 5 ),
/* how frequently to decide which peers live and die */
RECONNECT_PERIOD_MSEC = ( 2 * 1000 ),
/* how frequently to reallocate bandwidth */
BANDWIDTH_PERIOD_MSEC = 500,
/* max # of peers to ask fer per torrent per reconnect pulse */
MAX_RECONNECTIONS_PER_PULSE = 16,
@ -74,6 +73,7 @@ enum
/* use for bitwise operations w/peer_atom.myflags */
MYFLAG_BANNED = 1,
/* use for bitwise operations w/peer_atom.myflags */
/* unreachable for now... but not banned.
* if they try to connect to us it's okay */
MYFLAG_UNREACHABLE = 2,
@ -126,9 +126,6 @@ typedef struct tr_torrent_peers
tr_ptrArray pool; /* struct peer_atom */
tr_ptrArray peers; /* tr_peer */
tr_ptrArray webseeds; /* tr_webseed */
tr_timer * reconnectTimer;
tr_timer * rechokeTimer;
tr_timer * refillTimer;
tr_torrent * tor;
tr_peer * optimistic; /* the optimistic peer, or NULL if none */
@ -141,6 +138,9 @@ struct tr_peerMgr
tr_session * session;
tr_ptrArray incomingHandshakes; /* tr_handshake */
tr_timer * bandwidthTimer;
tr_timer * rechokeTimer;
tr_timer * refillTimer;
tr_timer * reconnectTimer;
};
#define tordbg( t, ... ) \
@ -380,10 +380,6 @@ torrentDestructor( void * vt )
memcpy( hash, t->hash, SHA_DIGEST_LENGTH );
tr_timerFree( &t->reconnectTimer );
tr_timerFree( &t->rechokeTimer );
tr_timerFree( &t->refillTimer );
tr_ptrArrayDestruct( &t->webseeds, (PtrArrayForeachFunc)tr_webseedFree );
tr_ptrArrayDestruct( &t->pool, (PtrArrayForeachFunc)tr_free );
tr_ptrArrayDestruct( &t->outgoingHandshakes, NULL );
@ -424,8 +420,10 @@ torrentConstructor( tr_peerMgr * manager,
}
static int bandwidthPulse( void * vmgr );
static int bandwidthPulse ( void * vmgr );
static int rechokePulse ( void * vmgr );
static int refillPulse ( void * vmgr );
static int reconnectPulse ( void * vmgr );
tr_peerMgr*
tr_peerMgrNew( tr_session * session )
@ -435,6 +433,12 @@ tr_peerMgrNew( tr_session * session )
m->session = session;
m->incomingHandshakes = TR_PTR_ARRAY_INIT;
m->bandwidthTimer = tr_timerNew( session, bandwidthPulse, m, BANDWIDTH_PERIOD_MSEC );
m->rechokeTimer = tr_timerNew( session, rechokePulse, m, RECHOKE_PERIOD_MSEC );
m->refillTimer = tr_timerNew( session, refillPulse, m, REFILL_PERIOD_MSEC );
m->reconnectTimer = tr_timerNew( session, reconnectPulse, m, RECONNECT_PERIOD_MSEC );
rechokePulse( m );
return m;
}
@ -443,6 +447,9 @@ tr_peerMgrFree( tr_peerMgr * manager )
{
managerLock( manager );
tr_timerFree( &manager->reconnectTimer );
tr_timerFree( &manager->refillTimer );
tr_timerFree( &manager->rechokeTimer );
tr_timerFree( &manager->bandwidthTimer );
/* free the handshakes. Abort invokes handshakeDoneCB(), which removes
@ -727,8 +734,8 @@ getBlockOffsetInPiece( const tr_torrent * tor, uint64_t b )
return (uint32_t)( blockPos - piecePos );
}
static int
refillPulse( void * vtorrent )
static void
refillTorrent( Torrent * t )
{
tr_block_index_t block;
int peerCount;
@ -736,15 +743,13 @@ refillPulse( void * vtorrent )
tr_peer ** peers;
tr_webseed ** webseeds;
struct tr_blockIterator * blockIterator;
Torrent * t = vtorrent;
tr_torrent * tor = t->tor;
if( !t->isRunning )
return TRUE;
return;
if( tr_torrentIsSeed( t->tor ) )
return TRUE;
return;
torrentLock( t );
tordbg( t, "Refilling Request Buffers..." );
blockIterator = blockIteratorNew( t );
@ -816,10 +821,21 @@ refillPulse( void * vtorrent )
blockIteratorFree( blockIterator );
tr_free( webseeds );
tr_free( peers );
}
t->refillTimer = NULL;
torrentUnlock( t );
return FALSE;
static int
refillPulse( void * vmgr )
{
tr_torrent * tor = NULL;
tr_peerMgr * mgr = vmgr;
managerLock( mgr );
while(( tor = tr_torrentNext( mgr->session, tor )))
if( tor->isRunning && !tr_torrentIsSeed( tor ) )
refillTorrent( tor->torrentPeers );
managerUnlock( mgr );
return TRUE;
}
static void
@ -868,15 +884,6 @@ gotBadPiece( Torrent * t,
tor->downloadedCur -= MIN( tor->downloadedCur, byteCount );
}
static void
refillSoon( Torrent * t )
{
if( t->refillTimer == NULL )
t->refillTimer = tr_timerNew( t->manager->session,
refillPulse, t,
REFILL_PERIOD_MSEC );
}
static void
peerSuggestedPiece( Torrent * t UNUSED,
tr_peer * peer UNUSED,
@ -944,10 +951,6 @@ peerCallbackFunc( void * vpeer, void * vevent, void * vt )
}
break;
case TR_PEER_NEED_REQ:
refillSoon( t );
break;
case TR_PEER_CANCEL:
decrementPieceRequests( t, e->pieceIndex );
break;
@ -1506,42 +1509,10 @@ tr_peerMgrGetPeers( tr_torrent * tor,
return peersReturning;
}
static int reconnectPulse( void * vtorrent );
static int rechokePulse( void * vtorrent );
void
tr_peerMgrStartTorrent( tr_torrent * tor )
{
Torrent * t = tor->torrentPeers;
managerLock( t->manager );
assert( t );
assert( ( t->isRunning != 0 ) == ( t->reconnectTimer != NULL ) );
assert( ( t->isRunning != 0 ) == ( t->rechokeTimer != NULL ) );
if( !t->isRunning )
{
t->isRunning = 1;
t->reconnectTimer = tr_timerNew( t->manager->session,
reconnectPulse, t,
RECONNECT_PERIOD_MSEC );
t->rechokeTimer = tr_timerNew( t->manager->session,
rechokePulse, t,
RECHOKE_PERIOD_MSEC );
reconnectPulse( t );
rechokePulse( t );
if( !tr_ptrArrayEmpty( &t->webseeds ) )
refillSoon( t );
}
managerUnlock( t->manager );
tor->torrentPeers->isRunning = TRUE;
}
static void
@ -1549,9 +1520,7 @@ stopTorrent( Torrent * t )
{
assert( torrentIsLocked( t ) );
t->isRunning = 0;
tr_timerFree( &t->rechokeTimer );
tr_timerFree( &t->reconnectTimer );
t->isRunning = FALSE;
/* disconnect the peers. */
tr_ptrArrayForeach( &t->peers, (PtrArrayForeachFunc)peerDestructor );
@ -1867,7 +1836,7 @@ isSame( const tr_peer * peer )
**/
static void
rechoke( Torrent * t )
rechokeTorrent( Torrent * t )
{
int i, size, unchokedInterested;
const int peerCount = tr_ptrArraySize( &t->peers );
@ -1967,13 +1936,17 @@ rechoke( Torrent * t )
}
static int
rechokePulse( void * vtorrent )
rechokePulse( void * vmgr )
{
Torrent * t = vtorrent;
tr_torrent * tor = NULL;
tr_peerMgr * mgr = vmgr;
managerLock( mgr );
torrentLock( t );
rechoke( t );
torrentUnlock( t );
while(( tor = tr_torrentNext( mgr->session, tor )))
if( tor->isRunning )
rechokeTorrent( tor->torrentPeers );
managerUnlock( mgr );
return TRUE;
}
@ -2215,16 +2188,13 @@ closePeer( Torrent * t, tr_peer * peer )
removePeer( t, peer );
}
static int
reconnectPulse( void * vtorrent )
static void
reconnectTorrent( Torrent * t )
{
Torrent * t = vtorrent;
static time_t prevTime = 0;
static int newConnectionsThisSecond = 0;
time_t now;
torrentLock( t );
now = time( NULL );
if( prevTime != now )
{
@ -2326,8 +2296,20 @@ reconnectPulse( void * vtorrent )
tr_free( mustClose );
tr_free( canClose );
}
}
torrentUnlock( t );
static int
reconnectPulse( void * vmgr )
{
tr_torrent * tor = NULL;
tr_peerMgr * mgr = vmgr;
managerLock( mgr );
while(( tor = tr_torrentNext( mgr->session, tor )))
if( tor->isRunning )
reconnectTorrent( tor->torrentPeers );
managerUnlock( mgr );
return TRUE;
}

View File

@ -419,14 +419,6 @@ fireUploadOnly( tr_peermsgs * msgs, tr_bool uploadOnly )
publish( msgs, &e );
}
static void
fireNeedReq( tr_peermsgs * msgs )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_NEED_REQ;
publish( msgs, &e );
}
static void
firePeerProgress( tr_peermsgs * msgs )
{
@ -653,8 +645,6 @@ updateInterest( tr_peermsgs * msgs )
if( i != msgs->peer->clientIsInterested )
sendInterest( msgs, i );
if( i )
fireNeedReq( msgs );
}
static int
@ -819,9 +809,6 @@ pumpRequestQueue( tr_peermsgs * msgs, const time_t now )
if( sent )
dbgmsg( msgs, "pump sent %d requests, now have %d active and %d queued",
sent, msgs->clientAskedFor.len, msgs->clientWillAskFor.len );
if( len < max )
fireNeedReq( msgs );
}
static TR_INLINE int
@ -1366,7 +1353,6 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
case BT_UNCHOKE:
dbgmsg( msgs, "got Unchoke" );
msgs->peer->clientIsChoked = 0;
fireNeedReq( msgs );
break;
case BT_INTERESTED:
@ -1392,13 +1378,10 @@ readBtMessage( tr_peermsgs * msgs, struct evbuffer * inbuf, size_t inlen )
break;
case BT_BITFIELD:
{
dbgmsg( msgs, "got a bitfield" );
tr_peerIoReadBytes( msgs->peer->io, inbuf, msgs->peer->have->bits, msglen );
updatePeerProgress( msgs );
fireNeedReq( msgs );
break;
}
case BT_REQUEST:
{

View File

@ -359,28 +359,66 @@ tr_sessionSaveSettings( tr_session * session, const char * configDir, tr_benc *
static void metainfoLookupRescan( tr_session * );
static void tr_sessionInitImpl( void * );
struct init_data
{
tr_session * session;
const char * configDir;
tr_bool messageQueuingEnabled;
tr_benc * clientSettings;
};
tr_session *
tr_sessionInit( const char * tag,
const char * configDir,
tr_bool messageQueuingEnabled,
tr_benc * clientSettings )
{
int64_t i;
int64_t j;
tr_bool found;
const char * str;
tr_benc settings;
tr_session * session;
char * filename;
struct init_data data;
assert( tr_bencIsDict( clientSettings ) );
/* initialize the bare skeleton of the session object */
session = tr_new0( tr_session, 1 );
session->bandwidth = tr_bandwidthNew( session, NULL );
session->lock = tr_lockNew( );
session->tag = tr_strdup( tag );
session->magicNumber = SESSION_MAGIC_NUMBER;
/* start the libtransmission thread */
tr_netInit( ); /* must go before tr_eventInit */
tr_eventInit( session );
assert( session->events != NULL );
/* run the rest in the libtransmission thread */
session->isWaiting = TRUE;
data.session = session;
data.configDir = configDir;
data.messageQueuingEnabled = messageQueuingEnabled;
data.clientSettings = clientSettings;
tr_runInEventThread( session, tr_sessionInitImpl, &data );
while( session->isWaiting )
tr_wait( 100 );
return session;
}
static void
tr_sessionInitImpl( void * vdata )
{
int64_t i;
int64_t j;
tr_bool found;
const char * str;
tr_benc settings;
char * filename;
struct init_data * data = vdata;
tr_benc * clientSettings = data->clientSettings;
tr_session * session = data->session;
assert( tr_amInEventThread( session ) );
assert( tr_bencIsDict( clientSettings ) );
dbgmsg( "tr_sessionInit: the session's top-level bandwidth object is %p", session->bandwidth );
tr_bencInitDict( &settings, 0 );
@ -399,7 +437,7 @@ tr_sessionInit( const char * tag,
found = tr_bencDictFindInt( &settings, TR_PREFS_KEY_MSGLEVEL, &i );
assert( found );
tr_setMessageLevel( i );
tr_setMessageQueuing( messageQueuingEnabled );
tr_setMessageQueuing( data->messageQueuingEnabled );
found = tr_bencDictFindInt( &settings, TR_PREFS_KEY_PEX_ENABLED, &i );
@ -455,11 +493,9 @@ tr_sessionInit( const char * tag,
session->so_sndbuf = 1500 * 3; /* 3x MTU for most ethernet/wireless */
session->so_rcvbuf = 8192;
tr_setConfigDir( session, configDir );
tr_setConfigDir( session, data->configDir );
tr_netInit( ); /* must go before tr_eventInit */
tr_eventInit( session );
assert( session->events != NULL );
tr_trackerSessionInit( session );
session->peerMgr = tr_peerMgrNew( session );
@ -531,18 +567,6 @@ tr_sessionInit( const char * tag,
tr_bencFree( &settings );
session->isWaiting = TRUE;
tr_runInEventThread( session, tr_sessionInitImpl, session );
while( session->isWaiting )
tr_wait( 100 );
return session;
}
static void
tr_sessionInitImpl( void * vsession )
{
tr_session * session = vsession;
assert( tr_isSession( session ) );
/* first %s is the application name

View File

@ -841,17 +841,14 @@ struct tr_tracker_handle
static int trackerPulse( void * vsession );
static void
ensureGlobalsExist( tr_session * session )
void
tr_trackerSessionInit( tr_session * session )
{
if( session->tracker == NULL )
{
session->tracker = tr_new0( struct tr_tracker_handle, 1 );
session->tracker->pulseTimer =
tr_timerNew( session, trackerPulse, session,
PULSE_INTERVAL_MSEC );
dbgmsg( NULL, "creating tracker timer" );
}
assert( tr_isSession( session ) );
session->tracker = tr_new0( struct tr_tracker_handle, 1 );
session->tracker->pulseTimer = tr_timerNew( session, trackerPulse, session, PULSE_INTERVAL_MSEC );
dbgmsg( NULL, "creating tracker timer" );
}
void
@ -1035,8 +1032,6 @@ tr_trackerNew( const tr_torrent * torrent )
const tr_info * info = &torrent->info;
tr_tracker * t;
ensureGlobalsExist( torrent->session );
t = tr_new0( tr_tracker, 1 );
t->publisher = TR_PUBLISHER_INIT;
t->session = torrent->session;

View File

@ -31,6 +31,12 @@ tr_tracker * tr_trackerNew( const tr_torrent * );
void tr_trackerFree( tr_tracker * );
/**
***
**/
void tr_trackerSessionInit( tr_session * );
void tr_trackerSessionClose( tr_session * );
/**

View File

@ -191,19 +191,6 @@ readFromPipe( int fd,
break;
}
case 't': /* create timer */
{
tr_timer * timer;
const size_t nwant = sizeof( timer );
const ssize_t ngot = piperead( fd, &timer, nwant );
if( !eh->die && ( ngot == (ssize_t)nwant ) )
{
dbgmsg( "adding timer in libevent thread" );
evtimer_add( &timer->event, &timer->tv );
}
break;
}
case '\0': /* eof */
{
dbgmsg( "pipe eof reached... removing event listener" );
@ -294,7 +281,7 @@ tr_bool
tr_amInEventThread( tr_session * session )
{
assert( tr_isSession( session ) );
assert( session->events );
assert( session->events != NULL );
return tr_amInThread( session->events->thread );
}
@ -341,38 +328,23 @@ tr_timerFree( tr_timer ** ptimer )
}
tr_timer*
tr_timerNew( tr_session * session,
timer_func func,
void * user_data,
uint64_t interval_milliseconds )
tr_timerNew( tr_session * session,
timer_func func,
void * user_data,
uint64_t interval_milliseconds )
{
tr_timer * timer;
assert( tr_isSession( session ) );
assert( session->events != NULL );
assert( tr_amInEventThread( session ) );
timer = tr_new0( tr_timer, 1 );
tr_timevalMsec( interval_milliseconds, &timer->tv );
timer->func = func;
timer->user_data = user_data;
timer->eh = session->events;
tr_timevalMsec( interval_milliseconds, &timer->tv );
evtimer_set( &timer->event, timerCallback, timer );
if( tr_amInThread( session->events->thread ) )
{
evtimer_add( &timer->event, &timer->tv );
}
else
{
const char ch = 't';
int fd = session->events->fds[1];
tr_lock * lock = session->events->lock;
tr_lockLock( lock );
pipewrite( fd, &ch, 1 );
pipewrite( fd, &timer, sizeof( timer ) );
tr_lockUnlock( lock );
}
evtimer_add( &timer->event, &timer->tv );
return timer;
}

View File

@ -61,14 +61,6 @@ publish( tr_webseed * w,
w->callback( NULL, e, w->callback_userdata );
}
static void
fireNeedReq( tr_webseed * w )
{
tr_peer_event e = blankEvent;
e.eventType = TR_PEER_NEED_REQ;
publish( w, &e );
}
static void
fireClientGotBlock( tr_webseed * w, uint32_t pieceIndex, uint32_t offset, uint32_t length )
{
@ -180,10 +172,8 @@ webResponseFunc( tr_session * session,
w->busy = 0;
if( w->dead )
tr_webseedFree( w );
else {
else
fireClientGotBlock( w, w->pieceIndex, w->pieceOffset, w->byteCount );
fireNeedReq( w );
}
}
}
}